1use sqlx::{Row, Sqlite, SqlitePool, Transaction};
6
7use punkgo_core::actor::{ActorRecord, ActorStatus, ActorType, CreateActorSpec, WritableTarget};
8use punkgo_core::errors::{KernelError, KernelResult};
9
10#[derive(Clone)]
11pub struct ActorStore {
12 pool: SqlitePool,
13}
14
15impl ActorStore {
16 pub fn new(pool: SqlitePool) -> Self {
17 Self { pool }
18 }
19
20 pub async fn get(&self, actor_id: &str) -> KernelResult<Option<ActorRecord>> {
22 let row = sqlx::query(
23 r#"
24 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
25 writable_targets, energy_share, reduction_policy, created_at, updated_at
26 FROM actors
27 WHERE actor_id = ?1
28 "#,
29 )
30 .bind(actor_id)
31 .fetch_optional(&self.pool)
32 .await?;
33
34 match row {
35 Some(row) => Ok(Some(row_to_actor_record(&row)?)),
36 None => Ok(None),
37 }
38 }
39
40 pub async fn exists(&self, actor_id: &str) -> KernelResult<bool> {
42 let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
43 .bind(actor_id)
44 .fetch_optional(&self.pool)
45 .await?;
46 Ok(row.is_some())
47 }
48
49 pub async fn is_active(&self, actor_id: &str) -> KernelResult<bool> {
51 let row = sqlx::query("SELECT status FROM actors WHERE actor_id = ?1")
52 .bind(actor_id)
53 .fetch_optional(&self.pool)
54 .await?;
55 match row {
56 Some(row) => {
57 let status: String = row.get("status");
58 Ok(status == "active")
59 }
60 None => Ok(false),
61 }
62 }
63
64 pub async fn create_in_tx(
66 &self,
67 tx: &mut Transaction<'_, Sqlite>,
68 spec: &CreateActorSpec,
69 ) -> KernelResult<()> {
70 if spec.actor_id.trim().is_empty() {
71 return Err(KernelError::PolicyViolation(
72 "actor_id cannot be empty".to_string(),
73 ));
74 }
75
76 let existing = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
78 .bind(&spec.actor_id)
79 .fetch_optional(&mut **tx)
80 .await?;
81 if existing.is_some() {
82 return Err(KernelError::PolicyViolation(format!(
83 "actor already exists: {}",
84 spec.actor_id
85 )));
86 }
87
88 let lineage_json = serde_json::to_string(&spec.lineage).map_err(|e| {
89 KernelError::PolicyViolation(format!("failed to serialize lineage: {e}"))
90 })?;
91 let targets_json = serde_json::to_string(&spec.writable_targets).map_err(|e| {
92 KernelError::PolicyViolation(format!("failed to serialize writable_targets: {e}"))
93 })?;
94 let now = now_millis_string();
95
96 sqlx::query(
97 r#"
98 INSERT INTO actors (
99 actor_id, actor_type, creator_id, lineage, purpose, status,
100 writable_targets, energy_share, reduction_policy, created_at, updated_at
101 )
102 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?7, ?8, ?9, ?9)
103 "#,
104 )
105 .bind(&spec.actor_id)
106 .bind(spec.actor_type.as_str())
107 .bind(&spec.creator_id)
108 .bind(&lineage_json)
109 .bind(&spec.purpose)
110 .bind(&targets_json)
111 .bind(spec.energy_share)
112 .bind(&spec.reduction_policy)
113 .bind(&now)
114 .execute(&mut **tx)
115 .await?;
116
117 Ok(())
118 }
119
120 pub async fn set_status_in_tx(
122 &self,
123 tx: &mut Transaction<'_, Sqlite>,
124 actor_id: &str,
125 status: &ActorStatus,
126 ) -> KernelResult<()> {
127 let result = sqlx::query(
128 r#"
129 UPDATE actors SET status = ?1, updated_at = ?2 WHERE actor_id = ?3
130 "#,
131 )
132 .bind(status.as_str())
133 .bind(now_millis_string())
134 .bind(actor_id)
135 .execute(&mut **tx)
136 .await?;
137
138 if result.rows_affected() == 0 {
139 return Err(KernelError::ActorNotFound(actor_id.to_string()));
140 }
141 Ok(())
142 }
143
144 pub async fn update_energy_share_in_tx(
150 &self,
151 tx: &mut Transaction<'_, Sqlite>,
152 actor_id: &str,
153 energy_share: f64,
154 ) -> KernelResult<()> {
155 if !energy_share.is_finite() || energy_share < 0.0 {
156 return Err(KernelError::PolicyViolation(
157 "energy_share must be a finite non-negative number".to_string(),
158 ));
159 }
160
161 let result = sqlx::query(
162 r#"
163 UPDATE actors SET energy_share = ?1, updated_at = ?2 WHERE actor_id = ?3
164 "#,
165 )
166 .bind(energy_share)
167 .bind(now_millis_string())
168 .bind(actor_id)
169 .execute(&mut **tx)
170 .await?;
171
172 if result.rows_affected() == 0 {
173 return Err(KernelError::ActorNotFound(actor_id.to_string()));
174 }
175 Ok(())
176 }
177
178 pub async fn next_sequence(&self, creator_id: &str, purpose: &str) -> KernelResult<i64> {
181 let pattern = format!("{creator_id}/{purpose}/%");
183 let row = sqlx::query("SELECT COUNT(*) AS cnt FROM actors WHERE actor_id LIKE ?1")
184 .bind(&pattern)
185 .fetch_one(&self.pool)
186 .await?;
187 let count: i64 = row.get("cnt");
188 Ok(count + 1)
189 }
190
191 pub async fn list_active_with_shares(&self) -> KernelResult<Vec<(String, f64)>> {
198 let rows = sqlx::query(
199 r#"
200 SELECT actor_id, energy_share
201 FROM actors
202 WHERE status = 'active' AND energy_share > 0.0 AND actor_type = 'agent'
203 "#,
204 )
205 .fetch_all(&self.pool)
206 .await?;
207
208 Ok(rows
209 .iter()
210 .map(|row| {
211 let actor_id: String = row.get("actor_id");
212 let share: f64 = row.get("energy_share");
213 (actor_id, share)
214 })
215 .collect())
216 }
217
218 pub async fn list_children(&self, parent_id: &str) -> KernelResult<Vec<ActorRecord>> {
220 let rows = sqlx::query(
221 r#"
222 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
223 writable_targets, energy_share, reduction_policy, created_at, updated_at
224 FROM actors
225 WHERE creator_id = ?1
226 "#,
227 )
228 .bind(parent_id)
229 .fetch_all(&self.pool)
230 .await?;
231
232 rows.iter().map(row_to_actor_record).collect()
233 }
234
235 pub async fn list_descendants(&self, ancestor_id: &str) -> KernelResult<Vec<ActorRecord>> {
241 let lineage_pattern = format!("%\"{}\"%", ancestor_id);
242 let rows = sqlx::query(
243 r#"
244 SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
245 writable_targets, energy_share, reduction_policy, created_at, updated_at
246 FROM actors
247 WHERE lineage LIKE ?1 AND status = 'active'
248 "#,
249 )
250 .bind(&lineage_pattern)
251 .fetch_all(&self.pool)
252 .await?;
253
254 rows.iter().map(row_to_actor_record).collect()
255 }
256}
257
258fn row_to_actor_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<ActorRecord> {
263 let actor_type_str: String = row.get("actor_type");
264 let status_str: String = row.get("status");
265 let lineage_json: String = row.get("lineage");
266 let targets_json: String = row.get("writable_targets");
267 let creator_id: Option<String> = row.get("creator_id");
268 let purpose: Option<String> = row.get("purpose");
269
270 let actor_type = ActorType::parse(&actor_type_str).ok_or_else(|| {
271 KernelError::PolicyViolation(format!("invalid actor_type in DB: {actor_type_str}"))
272 })?;
273 let status = ActorStatus::parse(&status_str).ok_or_else(|| {
274 KernelError::PolicyViolation(format!("invalid actor status in DB: {status_str}"))
275 })?;
276 let lineage: Vec<String> = serde_json::from_str(&lineage_json)
277 .map_err(|e| KernelError::PolicyViolation(format!("invalid lineage JSON: {e}")))?;
278 let writable_targets: Vec<WritableTarget> = serde_json::from_str(&targets_json)
279 .map_err(|e| KernelError::PolicyViolation(format!("invalid writable_targets JSON: {e}")))?;
280
281 Ok(ActorRecord {
282 actor_id: row.get("actor_id"),
283 actor_type,
284 creator_id,
285 lineage,
286 purpose,
287 status,
288 writable_targets,
289 energy_share: row.get("energy_share"),
290 reduction_policy: row.get("reduction_policy"),
291 created_at: row.get("created_at"),
292 updated_at: row.get("updated_at"),
293 })
294}
295
296fn now_millis_string() -> String {
297 let now = std::time::SystemTime::now()
298 .duration_since(std::time::UNIX_EPOCH)
299 .unwrap_or_default();
300 now.as_millis().to_string()
301}