1pub mod inbound;
2pub mod outbound;
3pub mod peers;
4pub mod verify_cache;
5
6pub use inbound::*;
7pub use outbound::*;
8pub use peers::*;
9pub use verify_cache::*;
10
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex};
14
15use rusqlite::{Connection, OptionalExtension, params};
16
17use crate::config::{ConfigPathOptions, get_config_dir};
18use crate::error::{CoreError, Result};
19
20pub const SQLITE_FILE_NAME: &str = "clawdentity.sqlite3";
21
22const MIGRATION_NAME_PHASE3: &str = "0001_phase3_persistence_model";
23const MIGRATION_SQL_PHASE3: &str = r#"
24CREATE TABLE IF NOT EXISTS peers (
25 alias TEXT PRIMARY KEY,
26 did TEXT NOT NULL,
27 proxy_url TEXT NOT NULL,
28 agent_name TEXT,
29 human_name TEXT,
30 created_at_ms INTEGER NOT NULL,
31 updated_at_ms INTEGER NOT NULL
32);
33
34CREATE TABLE IF NOT EXISTS outbound_queue (
35 frame_id TEXT PRIMARY KEY,
36 frame_version INTEGER NOT NULL,
37 frame_type TEXT NOT NULL,
38 to_agent_did TEXT NOT NULL,
39 payload_json TEXT NOT NULL,
40 conversation_id TEXT,
41 reply_to TEXT,
42 created_at_ms INTEGER NOT NULL
43);
44
45CREATE TABLE IF NOT EXISTS inbound_pending (
46 request_id TEXT PRIMARY KEY,
47 frame_id TEXT NOT NULL,
48 from_agent_did TEXT NOT NULL,
49 to_agent_did TEXT NOT NULL,
50 payload_json TEXT NOT NULL,
51 payload_bytes INTEGER NOT NULL,
52 received_at_ms INTEGER NOT NULL,
53 next_attempt_at_ms INTEGER NOT NULL,
54 attempt_count INTEGER NOT NULL,
55 last_error TEXT,
56 last_attempt_at_ms INTEGER,
57 conversation_id TEXT,
58 reply_to TEXT
59);
60
61CREATE TABLE IF NOT EXISTS inbound_dead_letter (
62 request_id TEXT PRIMARY KEY,
63 frame_id TEXT NOT NULL,
64 from_agent_did TEXT NOT NULL,
65 to_agent_did TEXT NOT NULL,
66 payload_json TEXT NOT NULL,
67 payload_bytes INTEGER NOT NULL,
68 received_at_ms INTEGER NOT NULL,
69 attempt_count INTEGER NOT NULL,
70 last_error TEXT,
71 last_attempt_at_ms INTEGER,
72 conversation_id TEXT,
73 reply_to TEXT,
74 dead_lettered_at_ms INTEGER NOT NULL,
75 dead_letter_reason TEXT NOT NULL
76);
77
78CREATE TABLE IF NOT EXISTS inbound_events (
79 id INTEGER PRIMARY KEY AUTOINCREMENT,
80 at_ms INTEGER NOT NULL,
81 event_type TEXT NOT NULL,
82 request_id TEXT,
83 details_json TEXT
84);
85
86CREATE TABLE IF NOT EXISTS verify_cache (
87 cache_key TEXT PRIMARY KEY,
88 registry_url TEXT NOT NULL,
89 fetched_at_ms INTEGER NOT NULL,
90 payload_json TEXT NOT NULL
91);
92
93CREATE INDEX IF NOT EXISTS idx_outbound_created_at_ms
94 ON outbound_queue(created_at_ms);
95CREATE INDEX IF NOT EXISTS idx_inbound_pending_next_attempt
96 ON inbound_pending(next_attempt_at_ms);
97CREATE INDEX IF NOT EXISTS idx_inbound_dead_letter_dead_lettered_at
98 ON inbound_dead_letter(dead_lettered_at_ms);
99CREATE INDEX IF NOT EXISTS idx_verify_cache_fetched_at_ms
100 ON verify_cache(fetched_at_ms);
101"#;
102const MIGRATION_NAME_PHASE4: &str = "0002_outbound_dead_letter";
103const MIGRATION_SQL_PHASE4: &str = r#"
104CREATE TABLE IF NOT EXISTS outbound_dead_letter (
105 frame_id TEXT PRIMARY KEY,
106 frame_version INTEGER NOT NULL,
107 frame_type TEXT NOT NULL,
108 to_agent_did TEXT NOT NULL,
109 payload_json TEXT NOT NULL,
110 conversation_id TEXT,
111 reply_to TEXT,
112 created_at_ms INTEGER NOT NULL,
113 dead_lettered_at_ms INTEGER NOT NULL,
114 dead_letter_reason TEXT NOT NULL
115);
116
117CREATE INDEX IF NOT EXISTS idx_outbound_dead_letter_dead_lettered_at
118 ON outbound_dead_letter(dead_lettered_at_ms);
119"#;
120
121#[derive(Clone)]
122pub struct SqliteStore {
123 connection: Arc<Mutex<Connection>>,
124 path: PathBuf,
125}
126
127impl SqliteStore {
128 pub fn open(options: &ConfigPathOptions) -> Result<Self> {
130 let path = get_config_dir(options)?.join(SQLITE_FILE_NAME);
131 Self::open_path(path)
132 }
133
134 pub fn open_path(path: impl Into<PathBuf>) -> Result<Self> {
136 let path = path.into();
137 if let Some(parent) = path.parent() {
138 fs::create_dir_all(parent).map_err(|source| CoreError::Io {
139 path: parent.to_path_buf(),
140 source,
141 })?;
142 }
143
144 let connection = Connection::open(&path)?;
145 configure_connection(&connection)?;
146 apply_migrations(&connection)?;
147
148 Ok(Self {
149 connection: Arc::new(Mutex::new(connection)),
150 path,
151 })
152 }
153
154 pub fn path(&self) -> &Path {
156 &self.path
157 }
158
159 pub fn with_connection<T>(
161 &self,
162 operation: impl FnOnce(&Connection) -> Result<T>,
163 ) -> Result<T> {
164 let guard = self.connection.lock().map_err(|_| {
165 CoreError::InvalidInput("sqlite connection lock is poisoned".to_string())
166 })?;
167 operation(&guard)
168 }
169}
170
171pub fn now_utc_ms() -> i64 {
173 chrono::Utc::now().timestamp_millis()
174}
175
176fn configure_connection(connection: &Connection) -> Result<()> {
177 connection.pragma_update(None, "journal_mode", "WAL")?;
178 connection.pragma_update(None, "foreign_keys", "ON")?;
179 Ok(())
180}
181
182fn apply_migrations(connection: &Connection) -> Result<()> {
183 tracing::info!("checking database migrations");
184 connection.execute_batch(
185 "CREATE TABLE IF NOT EXISTS schema_migrations (
186 name TEXT PRIMARY KEY,
187 applied_at_ms INTEGER NOT NULL
188 );",
189 )?;
190
191 apply_migration_if_needed(connection, MIGRATION_NAME_PHASE3, MIGRATION_SQL_PHASE3)?;
192 apply_migration_if_needed(connection, MIGRATION_NAME_PHASE4, MIGRATION_SQL_PHASE4)?;
193 Ok(())
194}
195
196fn apply_migration_if_needed(connection: &Connection, name: &str, sql: &str) -> Result<()> {
197 let already_applied: Option<String> = connection
198 .query_row(
199 "SELECT name FROM schema_migrations WHERE name = ?1",
200 [name],
201 |row| row.get(0),
202 )
203 .optional()?;
204 if already_applied.is_some() {
205 tracing::info!(migration = name, "database migration already applied");
206 return Ok(());
207 }
208
209 tracing::info!(migration = name, "applying database migration");
210 connection.execute_batch(sql)?;
211 connection.execute(
212 "INSERT INTO schema_migrations (name, applied_at_ms) VALUES (?1, ?2)",
213 params![name, now_utc_ms()],
214 )?;
215 tracing::info!(migration = name, "database migration applied");
216 Ok(())
217}
218
219#[cfg(test)]
220mod tests {
221 use tempfile::TempDir;
222
223 use super::SqliteStore;
224
225 #[test]
226 fn opens_database_and_applies_phase3_schema() {
227 let temp = TempDir::new().expect("temp dir");
228 let db = SqliteStore::open_path(temp.path().join("db.sqlite3")).expect("open db");
229
230 db.with_connection(|connection| {
231 let table_count: i64 = connection.query_row(
232 "SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name IN (
233 'peers',
234 'outbound_queue',
235 'outbound_dead_letter',
236 'inbound_pending',
237 'inbound_dead_letter',
238 'inbound_events',
239 'verify_cache',
240 'schema_migrations'
241 )",
242 [],
243 |row| row.get(0),
244 )?;
245 assert_eq!(table_count, 8);
246 Ok(())
247 })
248 .expect("schema query");
249 }
250}