Skip to main content

punkgo_kernel/state/
actor_store.rs

1//! Actor persistence layer — CRUD operations on the `actors` table.
2//!
3//! Covers: PIP-001 §5 (actor types), §7 (agent conditional existence).
4
5use sqlx::{Row, Sqlite, SqlitePool, Transaction};
6
7use punkgo_core::actor::{ActorRecord, ActorStatus, ActorType, CreateActorSpec, WritableTarget};
8use punkgo_core::errors::{KernelError, KernelResult};
9
10#[derive(Clone)]
11pub struct ActorStore {
12    pool: SqlitePool,
13}
14
15impl ActorStore {
16    pub fn new(pool: SqlitePool) -> Self {
17        Self { pool }
18    }
19
20    /// Look up an actor by ID. Returns None if not found.
21    pub async fn get(&self, actor_id: &str) -> KernelResult<Option<ActorRecord>> {
22        let row = sqlx::query(
23            r#"
24            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
25                   writable_targets, energy_share, reduction_policy, created_at, updated_at
26            FROM actors
27            WHERE actor_id = ?1
28            "#,
29        )
30        .bind(actor_id)
31        .fetch_optional(&self.pool)
32        .await?;
33
34        match row {
35            Some(row) => Ok(Some(row_to_actor_record(&row)?)),
36            None => Ok(None),
37        }
38    }
39
40    /// Check if an actor exists in the actors table.
41    pub async fn exists(&self, actor_id: &str) -> KernelResult<bool> {
42        let row = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
43            .bind(actor_id)
44            .fetch_optional(&self.pool)
45            .await?;
46        Ok(row.is_some())
47    }
48
49    /// Check if an actor is active (exists and status = 'active').
50    pub async fn is_active(&self, actor_id: &str) -> KernelResult<bool> {
51        let row = sqlx::query("SELECT status FROM actors WHERE actor_id = ?1")
52            .bind(actor_id)
53            .fetch_optional(&self.pool)
54            .await?;
55        match row {
56            Some(row) => {
57                let status: String = row.get("status");
58                Ok(status == "active")
59            }
60            None => Ok(false),
61        }
62    }
63
64    /// Insert a new actor within an existing transaction.
65    pub async fn create_in_tx(
66        &self,
67        tx: &mut Transaction<'_, Sqlite>,
68        spec: &CreateActorSpec,
69    ) -> KernelResult<()> {
70        if spec.actor_id.trim().is_empty() {
71            return Err(KernelError::PolicyViolation(
72                "actor_id cannot be empty".to_string(),
73            ));
74        }
75
76        // Check uniqueness
77        let existing = sqlx::query("SELECT actor_id FROM actors WHERE actor_id = ?1")
78            .bind(&spec.actor_id)
79            .fetch_optional(&mut **tx)
80            .await?;
81        if existing.is_some() {
82            return Err(KernelError::PolicyViolation(format!(
83                "actor already exists: {}",
84                spec.actor_id
85            )));
86        }
87
88        let lineage_json = serde_json::to_string(&spec.lineage).map_err(|e| {
89            KernelError::PolicyViolation(format!("failed to serialize lineage: {e}"))
90        })?;
91        let targets_json = serde_json::to_string(&spec.writable_targets).map_err(|e| {
92            KernelError::PolicyViolation(format!("failed to serialize writable_targets: {e}"))
93        })?;
94        let now = now_millis_string();
95
96        sqlx::query(
97            r#"
98            INSERT INTO actors (
99                actor_id, actor_type, creator_id, lineage, purpose, status,
100                writable_targets, energy_share, reduction_policy, created_at, updated_at
101            )
102            VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?7, ?8, ?9, ?9)
103            "#,
104        )
105        .bind(&spec.actor_id)
106        .bind(spec.actor_type.as_str())
107        .bind(&spec.creator_id)
108        .bind(&lineage_json)
109        .bind(&spec.purpose)
110        .bind(&targets_json)
111        .bind(spec.energy_share)
112        .bind(&spec.reduction_policy)
113        .bind(&now)
114        .execute(&mut **tx)
115        .await?;
116
117        Ok(())
118    }
119
120    /// Update an actor's status within an existing transaction.
121    pub async fn set_status_in_tx(
122        &self,
123        tx: &mut Transaction<'_, Sqlite>,
124        actor_id: &str,
125        status: &ActorStatus,
126    ) -> KernelResult<()> {
127        let result = sqlx::query(
128            r#"
129            UPDATE actors SET status = ?1, updated_at = ?2 WHERE actor_id = ?3
130            "#,
131        )
132        .bind(status.as_str())
133        .bind(now_millis_string())
134        .bind(actor_id)
135        .execute(&mut **tx)
136        .await?;
137
138        if result.rows_affected() == 0 {
139            return Err(KernelError::ActorNotFound(actor_id.to_string()));
140        }
141        Ok(())
142    }
143
144    /// Get the next sequence number for a given creator + purpose pair.
145    /// Used for derived identity (agent ID generation).
146    pub async fn next_sequence(&self, creator_id: &str, purpose: &str) -> KernelResult<i64> {
147        // Count existing actors created by this creator with matching purpose prefix
148        let pattern = format!("{creator_id}/{purpose}/%");
149        let row = sqlx::query("SELECT COUNT(*) AS cnt FROM actors WHERE actor_id LIKE ?1")
150            .bind(&pattern)
151            .fetch_one(&self.pool)
152            .await?;
153        let count: i64 = row.get("cnt");
154        Ok(count + 1)
155    }
156
157    /// List all active actors with energy_share > 0.
158    /// Returns (actor_id, energy_share) pairs for energy production distribution (PIP-001 §3).
159    pub async fn list_active_with_shares(&self) -> KernelResult<Vec<(String, f64)>> {
160        let rows = sqlx::query(
161            r#"
162            SELECT actor_id, energy_share
163            FROM actors
164            WHERE status = 'active' AND energy_share > 0.0
165            "#,
166        )
167        .fetch_all(&self.pool)
168        .await?;
169
170        Ok(rows
171            .iter()
172            .map(|row| {
173                let actor_id: String = row.get("actor_id");
174                let share: f64 = row.get("energy_share");
175                (actor_id, share)
176            })
177            .collect())
178    }
179
180    /// List all direct children of a given actor.
181    pub async fn list_children(&self, parent_id: &str) -> KernelResult<Vec<ActorRecord>> {
182        let rows = sqlx::query(
183            r#"
184            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
185                   writable_targets, energy_share, reduction_policy, created_at, updated_at
186            FROM actors
187            WHERE creator_id = ?1
188            "#,
189        )
190        .bind(parent_id)
191        .fetch_all(&self.pool)
192        .await?;
193
194        rows.iter().map(row_to_actor_record).collect()
195    }
196
197    /// List all active descendants of a given actor — actors whose lineage
198    /// JSON array contains the ancestor_id.
199    ///
200    /// Used for cascade freeze operations. When an actor is frozen,
201    /// all actors in its subtree (identified by lineage) are also frozen.
202    pub async fn list_descendants(&self, ancestor_id: &str) -> KernelResult<Vec<ActorRecord>> {
203        let lineage_pattern = format!("%\"{}\"%", ancestor_id);
204        let rows = sqlx::query(
205            r#"
206            SELECT actor_id, actor_type, creator_id, lineage, purpose, status,
207                   writable_targets, energy_share, reduction_policy, created_at, updated_at
208            FROM actors
209            WHERE lineage LIKE ?1 AND status = 'active'
210            "#,
211        )
212        .bind(&lineage_pattern)
213        .fetch_all(&self.pool)
214        .await?;
215
216        rows.iter().map(row_to_actor_record).collect()
217    }
218}
219
220// ---------------------------------------------------------------------------
221// Helpers
222// ---------------------------------------------------------------------------
223
224fn row_to_actor_record(row: &sqlx::sqlite::SqliteRow) -> KernelResult<ActorRecord> {
225    let actor_type_str: String = row.get("actor_type");
226    let status_str: String = row.get("status");
227    let lineage_json: String = row.get("lineage");
228    let targets_json: String = row.get("writable_targets");
229    let creator_id: Option<String> = row.get("creator_id");
230    let purpose: Option<String> = row.get("purpose");
231
232    let actor_type = ActorType::parse(&actor_type_str).ok_or_else(|| {
233        KernelError::PolicyViolation(format!("invalid actor_type in DB: {actor_type_str}"))
234    })?;
235    let status = ActorStatus::parse(&status_str).ok_or_else(|| {
236        KernelError::PolicyViolation(format!("invalid actor status in DB: {status_str}"))
237    })?;
238    let lineage: Vec<String> = serde_json::from_str(&lineage_json)
239        .map_err(|e| KernelError::PolicyViolation(format!("invalid lineage JSON: {e}")))?;
240    let writable_targets: Vec<WritableTarget> = serde_json::from_str(&targets_json)
241        .map_err(|e| KernelError::PolicyViolation(format!("invalid writable_targets JSON: {e}")))?;
242
243    Ok(ActorRecord {
244        actor_id: row.get("actor_id"),
245        actor_type,
246        creator_id,
247        lineage,
248        purpose,
249        status,
250        writable_targets,
251        energy_share: row.get("energy_share"),
252        reduction_policy: row.get("reduction_policy"),
253        created_at: row.get("created_at"),
254        updated_at: row.get("updated_at"),
255    })
256}
257
258fn now_millis_string() -> String {
259    let now = std::time::SystemTime::now()
260        .duration_since(std::time::UNIX_EPOCH)
261        .unwrap_or_default();
262    now.as_millis().to_string()
263}