1use sqlx::{Row, Sqlite, SqlitePool, Transaction};
6
7use punkgo_core::actor::{ActorRecord, ActorStatus, ActorType, CreateActorSpec, WritableTarget};
8use punkgo_core::errors::{KernelError, KernelResult};
9
10#[derive(Clone)]
11pub struct ActorStore {
12 pool: SqlitePool,
13}
14
15impl ActorStore {
16 pub fn new(pool: SqlitePool) -> Self {
17 Self { pool }
18 }
19
20 pub async fn get(&self, actor_id: &str) -> KernelResult<Option<ActorRecord>> {
22 let row = sqlx::query(
23 r#"
24 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
25 writable_targets, energy_share, reduction_policy, created_at, updated_at
26 FROM actors
27 WHERE actor_id = ?1
28 "#,
29 )
30 .bind(actor_id)
31 .fetch_optional(&self.pool)
32 .await?;
33
34 match row {
35 Some(row) => Ok(Some(row_to_actor_record(&row)?)),
36 None => Ok(None),
37 }
38 }
39
40 pub async fn exists(&self, actor_id: &str) -> KernelResult<bool> {
42 let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
43 .bind(actor_id)
44 .fetch_optional(&self.pool)
45 .await?;
46 Ok(row.is_some())
47 }
48
49 pub async fn is_active(&self, actor_id: &str) -> KernelResult<bool> {
51 let row = sqlx::query("SELECT status FROM actors WHERE actor_id = ?1")
52 .bind(actor_id)
53 .fetch_optional(&self.pool)
54 .await?;
55 match row {
56 Some(row) => {
57 let status: String = row.get("status");
58 Ok(status == "active")
59 }
60 None => Ok(false),
61 }
62 }
63
64 pub async fn create_in_tx(
66 &self,
67 tx: &mut Transaction<'_, Sqlite>,
68 spec: &CreateActorSpec,
69 ) -> KernelResult<()> {
70 if spec.actor_id.trim().is_empty() {
71 return Err(KernelError::PolicyViolation(
72 "actor_id cannot be empty".to_string(),
73 ));
74 }
75
76 let existing = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
78 .bind(&spec.actor_id)
79 .fetch_optional(&mut **tx)
80 .await?;
81 if existing.is_some() {
82 return Err(KernelError::PolicyViolation(format!(
83 "actor already exists: {}",
84 spec.actor_id
85 )));
86 }
87
88 let lineage_json = serde_json::to_string(&spec.lineage).map_err(|e| {
89 KernelError::PolicyViolation(format!("failed to serialize lineage: {e}"))
90 })?;
91 let targets_json = serde_json::to_string(&spec.writable_targets).map_err(|e| {
92 KernelError::PolicyViolation(format!("failed to serialize writable_targets: {e}"))
93 })?;
94 let now = now_millis_string();
95
96 sqlx::query(
97 r#"
98 INSERT INTO actors (
99 actor_id, actor_type, creator_id, lineage, purpose, status,
100 writable_targets, energy_share, reduction_policy, created_at, updated_at
101 )
102 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?7, ?8, ?9, ?9)
103 "#,
104 )
105 .bind(&spec.actor_id)
106 .bind(spec.actor_type.as_str())
107 .bind(&spec.creator_id)
108 .bind(&lineage_json)
109 .bind(&spec.purpose)
110 .bind(&targets_json)
111 .bind(spec.energy_share)
112 .bind(&spec.reduction_policy)
113 .bind(&now)
114 .execute(&mut **tx)
115 .await?;
116
117 Ok(())
118 }
119
120 pub async fn set_status_in_tx(
122 &self,
123 tx: &mut Transaction<'_, Sqlite>,
124 actor_id: &str,
125 status: &ActorStatus,
126 ) -> KernelResult<()> {
127 let result = sqlx::query(
128 r#"
129 UPDATE actors SET status = ?1, updated_at = ?2 WHERE actor_id = ?3
130 "#,
131 )
132 .bind(status.as_str())
133 .bind(now_millis_string())
134 .bind(actor_id)
135 .execute(&mut **tx)
136 .await?;
137
138 if result.rows_affected() == 0 {
139 return Err(KernelError::ActorNotFound(actor_id.to_string()));
140 }
141 Ok(())
142 }
143
144 pub async fn next_sequence(&self, creator_id: &str, purpose: &str) -> KernelResult<i64> {
147 let pattern = format!("{creator_id}/{purpose}/%");
149 let row = sqlx::query("SELECT COUNT(*) AS cnt FROM actors WHERE actor_id LIKE ?1")
150 .bind(&pattern)
151 .fetch_one(&self.pool)
152 .await?;
153 let count: i64 = row.get("cnt");
154 Ok(count + 1)
155 }
156
157 pub async fn list_active_with_shares(&self) -> KernelResult<Vec<(String, f64)>> {
160 let rows = sqlx::query(
161 r#"
162 SELECT actor_id, energy_share
163 FROM actors
164 WHERE status = 'active' AND energy_share > 0.0
165 "#,
166 )
167 .fetch_all(&self.pool)
168 .await?;
169
170 Ok(rows
171 .iter()
172 .map(|row| {
173 let actor_id: String = row.get("actor_id");
174 let share: f64 = row.get("energy_share");
175 (actor_id, share)
176 })
177 .collect())
178 }
179
180 pub async fn list_children(&self, parent_id: &str) -> KernelResult<Vec<ActorRecord>> {
182 let rows = sqlx::query(
183 r#"
184 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
185 writable_targets, energy_share, reduction_policy, created_at, updated_at
186 FROM actors
187 WHERE creator_id = ?1
188 "#,
189 )
190 .bind(parent_id)
191 .fetch_all(&self.pool)
192 .await?;
193
194 rows.iter().map(row_to_actor_record).collect()
195 }
196
197 pub async fn list_descendants(&self, ancestor_id: &str) -> KernelResult<Vec<ActorRecord>> {
203 let lineage_pattern = format!("%\"{}\"%", ancestor_id);
204 let rows = sqlx::query(
205 r#"
206 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
207 writable_targets, energy_share, reduction_policy, created_at, updated_at
208 FROM actors
209 WHERE lineage LIKE ?1 AND status = 'active'
210 "#,
211 )
212 .bind(&lineage_pattern)
213 .fetch_all(&self.pool)
214 .await?;
215
216 rows.iter().map(row_to_actor_record).collect()
217 }
218}
219
220fn row_to_actor_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<ActorRecord> {
225 let actor_type_str: String = row.get("actor_type");
226 let status_str: String = row.get("status");
227 let lineage_json: String = row.get("lineage");
228 let targets_json: String = row.get("writable_targets");
229 let creator_id: Option<String> = row.get("creator_id");
230 let purpose: Option<String> = row.get("purpose");
231
232 let actor_type = ActorType::parse(&actor_type_str).ok_or_else(|| {
233 KernelError::PolicyViolation(format!("invalid actor_type in DB: {actor_type_str}"))
234 })?;
235 let status = ActorStatus::parse(&status_str).ok_or_else(|| {
236 KernelError::PolicyViolation(format!("invalid actor status in DB: {status_str}"))
237 })?;
238 let lineage: Vec<String> = serde_json::from_str(&lineage_json)
239 .map_err(|e| KernelError::PolicyViolation(format!("invalid lineage JSON: {e}")))?;
240 let writable_targets: Vec<WritableTarget> = serde_json::from_str(&targets_json)
241 .map_err(|e| KernelError::PolicyViolation(format!("invalid writable_targets JSON: {e}")))?;
242
243 Ok(ActorRecord {
244 actor_id: row.get("actor_id"),
245 actor_type,
246 creator_id,
247 lineage,
248 purpose,
249 status,
250 writable_targets,
251 energy_share: row.get("energy_share"),
252 reduction_policy: row.get("reduction_policy"),
253 created_at: row.get("created_at"),
254 updated_at: row.get("updated_at"),
255 })
256}
257
258fn now_millis_string() -> String {
259 let now = std::time::SystemTime::now()
260 .duration_since(std::time::UNIX_EPOCH)
261 .unwrap_or_default();
262 now.as_millis().to_string()
263}