Skip to main content

punkgo_kernel/state/
actor_store.rs

1//! Actor persistence layer — CRUD operations on the `actors` table.
2//!
3//! Covers: PIP-001 §5 (actor types), §7 (agent conditional existence).
4
5use sqlx::{Row, Sqlite, SqlitePool, Transaction};
6
7use punkgo_core::actor::{ActorRecord, ActorStatus, ActorType, CreateActorSpec, WritableTarget};
8use punkgo_core::errors::{KernelError, KernelResult};
9
10#[derive(Clone)]
11pub struct ActorStore {
12    pool: SqlitePool,
13}
14
15impl ActorStore {
16    pub fn new(pool: SqlitePool) -> Self {
17        Self { pool }
18    }
19
20    /// Look up an actor by ID. Returns None if not found.
21    pub async fn get(&self, actor_id: &str) -> KernelResult<Option<ActorRecord>> {
22        let row = sqlx::query(
23            r#"
24            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
25                   writable_targets, energy_share, reduction_policy, created_at, updated_at
26            FROM actors
27            WHERE actor_id = ?1
28            "#,
29        )
30        .bind(actor_id)
31        .fetch_optional(&self.pool)
32        .await?;
33
34        match row {
35            Some(row) => Ok(Some(row_to_actor_record(&row)?)),
36            None => Ok(None),
37        }
38    }
39
40    /// Check if an actor exists in the actors table.
41    pub async fn exists(&self, actor_id: &str) -> KernelResult<bool> {
42        let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
43            .bind(actor_id)
44            .fetch_optional(&self.pool)
45            .await?;
46        Ok(row.is_some())
47    }
48
49    /// Check if an actor is active (exists and status = 'active').
50    pub async fn is_active(&self, actor_id: &str) -> KernelResult<bool> {
51        let row = sqlx::query("SELECT status FROM actors WHERE actor_id = ?1")
52            .bind(actor_id)
53            .fetch_optional(&self.pool)
54            .await?;
55        match row {
56            Some(row) => {
57                let status: String = row.get("status");
58                Ok(status == "active")
59            }
60            None => Ok(false),
61        }
62    }
63
64    /// Insert a new actor within an existing transaction.
65    pub async fn create_in_tx(
66        &self,
67        tx: &mut Transaction<'_, Sqlite>,
68        spec: &CreateActorSpec,
69    ) -> KernelResult<()> {
70        if spec.actor_id.trim().is_empty() {
71            return Err(KernelError::PolicyViolation(
72                "actor_id cannot be empty".to_string(),
73            ));
74        }
75
76        // Check uniqueness
77        let existing = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
78            .bind(&spec.actor_id)
79            .fetch_optional(&mut **tx)
80            .await?;
81        if existing.is_some() {
82            return Err(KernelError::PolicyViolation(format!(
83                "actor already exists: {}",
84                spec.actor_id
85            )));
86        }
87
88        let lineage_json = serde_json::to_string(&spec.lineage).map_err(|e| {
89            KernelError::PolicyViolation(format!("failed to serialize lineage: {e}"))
90        })?;
91        let targets_json = serde_json::to_string(&spec.writable_targets).map_err(|e| {
92            KernelError::PolicyViolation(format!("failed to serialize writable_targets: {e}"))
93        })?;
94        let now = now_millis_string();
95
96        sqlx::query(
97            r#"
98            INSERT INTO actors (
99                actor_id, actor_type, creator_id, lineage, purpose, status,
100                writable_targets, energy_share, reduction_policy, created_at, updated_at
101            )
102            VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?7, ?8, ?9, ?9)
103            "#,
104        )
105        .bind(&spec.actor_id)
106        .bind(spec.actor_type.as_str())
107        .bind(&spec.creator_id)
108        .bind(&lineage_json)
109        .bind(&spec.purpose)
110        .bind(&targets_json)
111        .bind(spec.energy_share)
112        .bind(&spec.reduction_policy)
113        .bind(&now)
114        .execute(&mut **tx)
115        .await?;
116
117        Ok(())
118    }
119
120    /// Update an actor's status within an existing transaction.
121    pub async fn set_status_in_tx(
122        &self,
123        tx: &mut Transaction<'_, Sqlite>,
124        actor_id: &str,
125        status: &ActorStatus,
126    ) -> KernelResult<()> {
127        let result = sqlx::query(
128            r#"
129            UPDATE actors SET status = ?1, updated_at = ?2 WHERE actor_id = ?3
130            "#,
131        )
132        .bind(status.as_str())
133        .bind(now_millis_string())
134        .bind(actor_id)
135        .execute(&mut **tx)
136        .await?;
137
138        if result.rows_affected() == 0 {
139            return Err(KernelError::ActorNotFound(actor_id.to_string()));
140        }
141        Ok(())
142    }
143
144    /// Update an actor's energy_share within an existing transaction.
145    ///
146    /// This allows adjusting how much tick-based energy an actor receives
147    /// after creation, without needing direct database access.
148    /// Only meaningful for agents (humans do not participate in energy distribution).
149    pub async fn update_energy_share_in_tx(
150        &self,
151        tx: &mut Transaction<'_, Sqlite>,
152        actor_id: &str,
153        energy_share: f64,
154    ) -> KernelResult<()> {
155        if !energy_share.is_finite() || energy_share < 0.0 {
156            return Err(KernelError::PolicyViolation(
157                "energy_share must be a finite non-negative number".to_string(),
158            ));
159        }
160
161        let result = sqlx::query(
162            r#"
163            UPDATE actors SET energy_share = ?1, updated_at = ?2 WHERE actor_id = ?3
164            "#,
165        )
166        .bind(energy_share)
167        .bind(now_millis_string())
168        .bind(actor_id)
169        .execute(&mut **tx)
170        .await?;
171
172        if result.rows_affected() == 0 {
173            return Err(KernelError::ActorNotFound(actor_id.to_string()));
174        }
175        Ok(())
176    }
177
178    /// Get the next sequence number for a given creator + purpose pair.
179    /// Used for derived identity (agent ID generation).
180    pub async fn next_sequence(&self, creator_id: &str, purpose: &str) -> KernelResult<i64> {
181        // Count existing actors created by this creator with matching purpose prefix
182        let pattern = format!("{creator_id}/{purpose}/%");
183        let row = sqlx::query("SELECT COUNT(*) AS cnt FROM actors WHERE actor_id LIKE ?1")
184            .bind(&pattern)
185            .fetch_one(&self.pool)
186            .await?;
187        let count: i64 = row.get("cnt");
188        Ok(count + 1)
189    }
190
191    /// List all active agents with energy_share > 0.
192    /// Returns (actor_id, energy_share) pairs for energy production distribution (PIP-001 §3).
193    ///
194    /// Only agents participate in tick-based energy distribution. Humans (including root)
195    /// receive a one-time initial energy balance at creation — they perform infrequent
196    /// management operations and do not need ongoing energy production.
197    pub async fn list_active_with_shares(&self) -> KernelResult<Vec<(String, f64)>> {
198        let rows = sqlx::query(
199            r#"
200            SELECT actor_id, energy_share
201            FROM actors
202            WHERE status = 'active' AND energy_share > 0.0 AND actor_type = 'agent'
203            "#,
204        )
205        .fetch_all(&self.pool)
206        .await?;
207
208        Ok(rows
209            .iter()
210            .map(|row| {
211                let actor_id: String = row.get("actor_id");
212                let share: f64 = row.get("energy_share");
213                (actor_id, share)
214            })
215            .collect())
216    }
217
218    /// List all direct children of a given actor.
219    pub async fn list_children(&self, parent_id: &str) -> KernelResult<Vec<ActorRecord>> {
220        let rows = sqlx::query(
221            r#"
222            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
223                   writable_targets, energy_share, reduction_policy, created_at, updated_at
224            FROM actors
225            WHERE creator_id = ?1
226            "#,
227        )
228        .bind(parent_id)
229        .fetch_all(&self.pool)
230        .await?;
231
232        rows.iter().map(row_to_actor_record).collect()
233    }
234
235    /// List all active descendants of a given actor — actors whose lineage
236    /// JSON array contains the ancestor_id.
237    ///
238    /// Used for cascade freeze operations. When an actor is frozen,
239    /// all actors in its subtree (identified by lineage) are also frozen.
240    pub async fn list_descendants(&self, ancestor_id: &str) -> KernelResult<Vec<ActorRecord>> {
241        let lineage_pattern = format!("%\"{}\"%", ancestor_id);
242        let rows = sqlx::query(
243            r#"
244            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
245                   writable_targets, energy_share, reduction_policy, created_at, updated_at
246            FROM actors
247            WHERE lineage LIKE ?1 AND status = 'active'
248            "#,
249        )
250        .bind(&lineage_pattern)
251        .fetch_all(&self.pool)
252        .await?;
253
254        rows.iter().map(row_to_actor_record).collect()
255    }
256}
257
258// ---------------------------------------------------------------------------
259// Helpers
260// ---------------------------------------------------------------------------
261
262fn row_to_actor_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<ActorRecord> {
263    let actor_type_str: String = row.get("actor_type");
264    let status_str: String = row.get("status");
265    let lineage_json: String = row.get("lineage");
266    let targets_json: String = row.get("writable_targets");
267    let creator_id: Option<String> = row.get("creator_id");
268    let purpose: Option<String> = row.get("purpose");
269
270    let actor_type = ActorType::parse(&actor_type_str).ok_or_else(|| {
271        KernelError::PolicyViolation(format!("invalid actor_type in DB: {actor_type_str}"))
272    })?;
273    let status = ActorStatus::parse(&status_str).ok_or_else(|| {
274        KernelError::PolicyViolation(format!("invalid actor status in DB: {status_str}"))
275    })?;
276    let lineage: Vec<String> = serde_json::from_str(&lineage_json)
277        .map_err(|e| KernelError::PolicyViolation(format!("invalid lineage JSON: {e}")))?;
278    let writable_targets: Vec<WritableTarget> = serde_json::from_str(&targets_json)
279        .map_err(|e| KernelError::PolicyViolation(format!("invalid writable_targets JSON: {e}")))?;
280
281    Ok(ActorRecord {
282        actor_id: row.get("actor_id"),
283        actor_type,
284        creator_id,
285        lineage,
286        purpose,
287        status,
288        writable_targets,
289        energy_share: row.get("energy_share"),
290        reduction_policy: row.get("reduction_policy"),
291        created_at: row.get("created_at"),
292        updated_at: row.get("updated_at"),
293    })
294}
295
296fn now_millis_string() -> String {
297    let now = std::time::SystemTime::now()
298        .duration_since(std::time::UNIX_EPOCH)
299        .unwrap_or_default();
300    now.as_millis().to_string()
301}