Skip to main content

clawdentity_core/db/
mod.rs

1pub mod inbound;
2pub mod outbound;
3pub mod peers;
4pub mod verify_cache;
5
6pub use inbound::*;
7pub use outbound::*;
8pub use peers::*;
9pub use verify_cache::*;
10
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex};
14
15use rusqlite::{Connection, OptionalExtension, params};
16
17use crate::config::{ConfigPathOptions, get_config_dir};
18use crate::error::{CoreError, Result};
19
20pub const SQLITE_FILE_NAME: &str = "clawdentity.sqlite3";
21
22const MIGRATION_NAME_PHASE3: &str = "0001_phase3_persistence_model";
23const MIGRATION_SQL_PHASE3: &str = r#"
24CREATE TABLE IF NOT EXISTS peers (
25    alias TEXT PRIMARY KEY,
26    did TEXT NOT NULL,
27    proxy_url TEXT NOT NULL,
28    agent_name TEXT,
29    human_name TEXT,
30    created_at_ms INTEGER NOT NULL,
31    updated_at_ms INTEGER NOT NULL
32);
33
34CREATE TABLE IF NOT EXISTS outbound_queue (
35    frame_id TEXT PRIMARY KEY,
36    frame_version INTEGER NOT NULL,
37    frame_type TEXT NOT NULL,
38    to_agent_did TEXT NOT NULL,
39    payload_json TEXT NOT NULL,
40    conversation_id TEXT,
41    reply_to TEXT,
42    created_at_ms INTEGER NOT NULL
43);
44
45CREATE TABLE IF NOT EXISTS inbound_pending (
46    request_id TEXT PRIMARY KEY,
47    frame_id TEXT NOT NULL,
48    from_agent_did TEXT NOT NULL,
49    to_agent_did TEXT NOT NULL,
50    payload_json TEXT NOT NULL,
51    payload_bytes INTEGER NOT NULL,
52    received_at_ms INTEGER NOT NULL,
53    next_attempt_at_ms INTEGER NOT NULL,
54    attempt_count INTEGER NOT NULL,
55    last_error TEXT,
56    last_attempt_at_ms INTEGER,
57    conversation_id TEXT,
58    reply_to TEXT
59);
60
61CREATE TABLE IF NOT EXISTS inbound_dead_letter (
62    request_id TEXT PRIMARY KEY,
63    frame_id TEXT NOT NULL,
64    from_agent_did TEXT NOT NULL,
65    to_agent_did TEXT NOT NULL,
66    payload_json TEXT NOT NULL,
67    payload_bytes INTEGER NOT NULL,
68    received_at_ms INTEGER NOT NULL,
69    attempt_count INTEGER NOT NULL,
70    last_error TEXT,
71    last_attempt_at_ms INTEGER,
72    conversation_id TEXT,
73    reply_to TEXT,
74    dead_lettered_at_ms INTEGER NOT NULL,
75    dead_letter_reason TEXT NOT NULL
76);
77
78CREATE TABLE IF NOT EXISTS inbound_events (
79    id INTEGER PRIMARY KEY AUTOINCREMENT,
80    at_ms INTEGER NOT NULL,
81    event_type TEXT NOT NULL,
82    request_id TEXT,
83    details_json TEXT
84);
85
86CREATE TABLE IF NOT EXISTS verify_cache (
87    cache_key TEXT PRIMARY KEY,
88    registry_url TEXT NOT NULL,
89    fetched_at_ms INTEGER NOT NULL,
90    payload_json TEXT NOT NULL
91);
92
93CREATE INDEX IF NOT EXISTS idx_outbound_created_at_ms
94    ON outbound_queue(created_at_ms);
95CREATE INDEX IF NOT EXISTS idx_inbound_pending_next_attempt
96    ON inbound_pending(next_attempt_at_ms);
97CREATE INDEX IF NOT EXISTS idx_inbound_dead_letter_dead_lettered_at
98    ON inbound_dead_letter(dead_lettered_at_ms);
99CREATE INDEX IF NOT EXISTS idx_verify_cache_fetched_at_ms
100    ON verify_cache(fetched_at_ms);
101"#;
102const MIGRATION_NAME_PHASE4: &str = "0002_outbound_dead_letter";
103const MIGRATION_SQL_PHASE4: &str = r#"
104CREATE TABLE IF NOT EXISTS outbound_dead_letter (
105    frame_id TEXT PRIMARY KEY,
106    frame_version INTEGER NOT NULL,
107    frame_type TEXT NOT NULL,
108    to_agent_did TEXT NOT NULL,
109    payload_json TEXT NOT NULL,
110    conversation_id TEXT,
111    reply_to TEXT,
112    created_at_ms INTEGER NOT NULL,
113    dead_lettered_at_ms INTEGER NOT NULL,
114    dead_letter_reason TEXT NOT NULL
115);
116
117CREATE INDEX IF NOT EXISTS idx_outbound_dead_letter_dead_lettered_at
118    ON outbound_dead_letter(dead_lettered_at_ms);
119"#;
120
121#[derive(Clone)]
122pub struct SqliteStore {
123    connection: Arc<Mutex<Connection>>,
124    path: PathBuf,
125}
126
127impl SqliteStore {
128    /// TODO(clawdentity): document `open`.
129    pub fn open(options: &ConfigPathOptions) -> Result<Self> {
130        let path = get_config_dir(options)?.join(SQLITE_FILE_NAME);
131        Self::open_path(path)
132    }
133
134    /// TODO(clawdentity): document `open_path`.
135    pub fn open_path(path: impl Into<PathBuf>) -> Result<Self> {
136        let path = path.into();
137        if let Some(parent) = path.parent() {
138            fs::create_dir_all(parent).map_err(|source| CoreError::Io {
139                path: parent.to_path_buf(),
140                source,
141            })?;
142        }
143
144        let connection = Connection::open(&path)?;
145        configure_connection(&connection)?;
146        apply_migrations(&connection)?;
147
148        Ok(Self {
149            connection: Arc::new(Mutex::new(connection)),
150            path,
151        })
152    }
153
154    /// TODO(clawdentity): document `path`.
155    pub fn path(&self) -> &Path {
156        &self.path
157    }
158
159    /// TODO(clawdentity): document `with_connection`.
160    pub fn with_connection<T>(
161        &self,
162        operation: impl FnOnce(&Connection) -> Result<T>,
163    ) -> Result<T> {
164        let guard = self.connection.lock().map_err(|_| {
165            CoreError::InvalidInput("sqlite connection lock is poisoned".to_string())
166        })?;
167        operation(&guard)
168    }
169}
170
171/// TODO(clawdentity): document `now_utc_ms`.
172pub fn now_utc_ms() -> i64 {
173    chrono::Utc::now().timestamp_millis()
174}
175
176fn configure_connection(connection: &Connection) -> Result<()> {
177    connection.pragma_update(None, "journal_mode", "WAL")?;
178    connection.pragma_update(None, "foreign_keys", "ON")?;
179    Ok(())
180}
181
182fn apply_migrations(connection: &Connection) -> Result<()> {
183    tracing::info!("checking database migrations");
184    connection.execute_batch(
185        "CREATE TABLE IF NOT EXISTS schema_migrations (
186            name TEXT PRIMARY KEY,
187            applied_at_ms INTEGER NOT NULL
188        );",
189    )?;
190
191    apply_migration_if_needed(connection, MIGRATION_NAME_PHASE3, MIGRATION_SQL_PHASE3)?;
192    apply_migration_if_needed(connection, MIGRATION_NAME_PHASE4, MIGRATION_SQL_PHASE4)?;
193    Ok(())
194}
195
196fn apply_migration_if_needed(connection: &Connection, name: &str, sql: &str) -> Result<()> {
197    let already_applied: Option<String> = connection
198        .query_row(
199            "SELECT name FROM schema_migrations WHERE name = ?1",
200            [name],
201            |row| row.get(0),
202        )
203        .optional()?;
204    if already_applied.is_some() {
205        tracing::info!(migration = name, "database migration already applied");
206        return Ok(());
207    }
208
209    tracing::info!(migration = name, "applying database migration");
210    connection.execute_batch(sql)?;
211    connection.execute(
212        "INSERT INTO schema_migrations (name, applied_at_ms) VALUES (?1, ?2)",
213        params![name, now_utc_ms()],
214    )?;
215    tracing::info!(migration = name, "database migration applied");
216    Ok(())
217}
218
219#[cfg(test)]
220mod tests {
221    use tempfile::TempDir;
222
223    use super::SqliteStore;
224
225    #[test]
226    fn opens_database_and_applies_phase3_schema() {
227        let temp = TempDir::new().expect("temp dir");
228        let db = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
229
230        db.with_connection(|connection| {
231            let table_count: i64 = connection.query_row(
232                "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name IN (
233                    'peers',
234                    'outbound_queue',
235                    'outbound_dead_letter',
236                    'inbound_pending',
237                    'inbound_dead_letter',
238                    'inbound_events',
239                    'verify_cache',
240                    'schema_migrations'
241                )",
242                [],
243                |row| row.get(0),
244            )?;
245            assert_eq!(table_count, 8);
246            Ok(())
247        })
248        .expect("schema query");
249    }
250}