Skip to main content

claw_core/store/
active.rs

1//! Active / scratchpad memory store.
2//!
3//! The `active_memory` table functions as a fast scratchpad for AI agent
4//! working memory. Records are keyed by `(agent_id, key)` and hold an
5//! arbitrary JSON value. Values can be upserted, fetched, and expired
6//! individually or in bulk.
7
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use sqlx::SqlitePool;
11use uuid::Uuid;
12
13use crate::error::{ClawError, ClawResult};
14
15/// A single active-memory record.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ActiveMemoryRecord {
18    /// Unique row identifier.
19    pub id: Uuid,
20    /// The agent that owns this record.
21    pub agent_id: String,
22    /// Logical key within the agent's scratchpad.
23    pub key: String,
24    /// Serialized JSON value.
25    pub value: serde_json::Value,
26    /// Timestamp when this record was created or last updated.
27    pub updated_at: DateTime<Utc>,
28    /// Optional expiry timestamp. `None` means the record never expires.
29    pub expires_at: Option<DateTime<Utc>>,
30}
31
32/// Data-access object for the `active_memory` table.
33#[derive(Debug)]
34pub struct ActiveMemoryStore<'a> {
35    pool: &'a SqlitePool,
36}
37
38impl<'a> ActiveMemoryStore<'a> {
39    /// Create a new store bound to `pool`.
40    pub fn new(pool: &'a SqlitePool) -> Self {
41        ActiveMemoryStore { pool }
42    }
43
44    /// Upsert a record into `active_memory`.
45    ///
46    /// If a record with the same `(agent_id, key)` already exists, its value
47    /// and `updated_at` timestamp are updated.
48    ///
49    /// # Errors
50    ///
51    /// Returns a [`ClawError`] if the SQL execution fails.
52    pub async fn upsert(&self, record: &ActiveMemoryRecord) -> ClawResult<()> {
53        sqlx::query(
54            r#"
55            INSERT INTO active_memory (id, agent_id, key, value, updated_at, expires_at)
56            VALUES (?, ?, ?, ?, ?, ?)
57            ON CONFLICT (agent_id, key)
58            DO UPDATE SET value = excluded.value,
59                          updated_at = excluded.updated_at,
60                          expires_at = excluded.expires_at
61            "#,
62        )
63        .bind(record.id.to_string())
64        .bind(&record.agent_id)
65        .bind(&record.key)
66        .bind(serde_json::to_string(&record.value)?)
67        .bind(record.updated_at.to_rfc3339())
68        .bind(record.expires_at.map(|t| t.to_rfc3339()))
69        .execute(self.pool)
70        .await?;
71
72        Ok(())
73    }
74
75    /// Fetch the record for `(agent_id, key)`, if it exists.
76    ///
77    /// # Errors
78    ///
79    /// Returns a [`ClawError`] if the query fails.
80    pub async fn get(&self, agent_id: &str, key: &str) -> ClawResult<Option<ActiveMemoryRecord>> {
81        let row = sqlx::query_as::<_, (String, String, String, String, String, Option<String>)>(
82            "SELECT id, agent_id, key, value, updated_at, expires_at \
83             FROM active_memory WHERE agent_id = ? AND key = ?",
84        )
85        .bind(agent_id)
86        .bind(key)
87        .fetch_optional(self.pool)
88        .await?;
89
90        row.map(|(id, agent_id, key, value, updated_at, expires_at)| {
91            Ok(ActiveMemoryRecord {
92                id: Uuid::parse_str(&id).map_err(|e| ClawError::Store(e.to_string()))?,
93                agent_id,
94                key,
95                value: serde_json::from_str(&value)?,
96                updated_at: DateTime::parse_from_rfc3339(&updated_at)
97                    .map_err(|e| ClawError::Store(e.to_string()))?
98                    .with_timezone(&Utc),
99                expires_at: expires_at
100                    .map(|s| {
101                        DateTime::parse_from_rfc3339(&s)
102                            .map(|dt| dt.with_timezone(&Utc))
103                            .map_err(|e| ClawError::Store(e.to_string()))
104                    })
105                    .transpose()?,
106            })
107        })
108        .transpose()
109    }
110
111    /// Delete all records belonging to `agent_id`.
112    ///
113    /// # Errors
114    ///
115    /// Returns a [`ClawError`] if the SQL execution fails.
116    pub async fn clear_agent(&self, agent_id: &str) -> ClawResult<u64> {
117        let result = sqlx::query("DELETE FROM active_memory WHERE agent_id = ?")
118            .bind(agent_id)
119            .execute(self.pool)
120            .await?;
121
122        Ok(result.rows_affected())
123    }
124}