dx_forge/storage/
db.rs

1use anyhow::Result;
2use parking_lot::Mutex;
3use rusqlite::{Connection, params};
4use std::path::Path;
5use std::sync::Arc;
6
7use crate::crdt::{Anchor, Operation};
8
9pub struct Database {
10    pub conn: Arc<Mutex<Connection>>,
11}
12
13impl Database {
14    pub fn new(forge_path: &Path) -> Result<Self> {
15        let db_path = forge_path.join("forge.db");
16        let conn = Connection::open(db_path)?;
17
18        Ok(Self {
19            conn: Arc::new(Mutex::new(conn)),
20        })
21    }
22
23    pub fn open(forge_path: &str) -> Result<Self> {
24        Self::new(Path::new(forge_path))
25    }
26
27    pub fn initialize(&self) -> Result<()> {
28        let conn = self.conn.lock();
29
30        conn.execute(
31            "CREATE TABLE IF NOT EXISTS operations (
32                id TEXT PRIMARY KEY,
33                timestamp TEXT NOT NULL,
34                actor_id TEXT NOT NULL,
35                file_path TEXT NOT NULL,
36                op_type TEXT NOT NULL,
37                op_data BLOB NOT NULL,
38                parent_ops TEXT
39            )",
40            [],
41        )?;
42
43        conn.execute(
44            "CREATE TABLE IF NOT EXISTS anchors (
45                id TEXT PRIMARY KEY,
46                file_path TEXT NOT NULL,
47                stable_id TEXT NOT NULL UNIQUE,
48                position BLOB NOT NULL,
49                created_at TEXT NOT NULL,
50                message TEXT,
51                tags TEXT
52            )",
53            [],
54        )?;
55
56        conn.execute(
57            "CREATE TABLE IF NOT EXISTS annotations (
58                id TEXT PRIMARY KEY,
59                file_path TEXT NOT NULL,
60                anchor_id TEXT,
61                line INTEGER NOT NULL,
62                content TEXT NOT NULL,
63                author TEXT NOT NULL,
64                created_at TEXT NOT NULL,
65                is_ai BOOLEAN NOT NULL,
66                FOREIGN KEY(anchor_id) REFERENCES anchors(id)
67            )",
68            [],
69        )?;
70
71        conn.execute(
72            "CREATE INDEX IF NOT EXISTS idx_ops_file_time
73             ON operations(file_path, timestamp)",
74            [],
75        )?;
76
77        conn.execute(
78            "CREATE INDEX IF NOT EXISTS idx_anchors_file
79             ON anchors(file_path)",
80            [],
81        )?;
82
83        conn.execute(
84            "CREATE INDEX IF NOT EXISTS idx_annotations_file
85             ON annotations(file_path, line)",
86            [],
87        )?;
88
89        Ok(())
90    }
91
92    pub fn store_operation(&self, op: &Operation) -> Result<bool> {
93        let conn = self.conn.lock();
94        let op_data = bincode::serialize(&op.op_type)?;
95        let parent_ops = serde_json::to_string(&op.parent_ops)?;
96
97        conn.execute(
98            "INSERT OR IGNORE INTO operations (id, timestamp, actor_id, file_path, op_type, op_data, parent_ops)
99             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
100            params![
101                op.id.to_string(),
102                op.timestamp.to_rfc3339(),
103                op.actor_id,
104                op.file_path,
105                format!("{:?}", op.op_type).split('{').next().unwrap(),
106                op_data,
107                parent_ops,
108            ],
109        )
110        .map(|changes| changes > 0)
111        .map_err(Into::into)
112    }
113
114    pub fn get_operations(&self, file: Option<&Path>, limit: usize) -> Result<Vec<Operation>> {
115        let conn = self.conn.lock();
116
117        let query = if let Some(f) = file {
118            format!(
119                "SELECT id, timestamp, actor_id, file_path, op_data, parent_ops
120                 FROM operations
121                 WHERE file_path = '{}'
122                 ORDER BY timestamp DESC
123                 LIMIT {}",
124                f.display(),
125                limit
126            )
127        } else {
128            format!(
129                "SELECT id, timestamp, actor_id, file_path, op_data, parent_ops
130                 FROM operations
131                 ORDER BY timestamp DESC
132                 LIMIT {}",
133                limit
134            )
135        };
136
137        let mut stmt = conn.prepare(&query)?;
138        let ops = stmt.query_map([], |row| {
139            let id: String = row.get(0)?;
140            let timestamp: String = row.get(1)?;
141            let actor_id: String = row.get(2)?;
142            let file_path: String = row.get(3)?;
143            let op_data: Vec<u8> = row.get(4)?;
144            let parent_ops: String = row.get(5)?;
145
146            let op_type = bincode::deserialize(&op_data).unwrap();
147            let parents: Vec<uuid::Uuid> = serde_json::from_str(&parent_ops).unwrap();
148
149            Ok(Operation {
150                id: uuid::Uuid::parse_str(&id).unwrap(),
151                timestamp: chrono::DateTime::parse_from_rfc3339(&timestamp)
152                    .unwrap()
153                    .into(),
154                actor_id,
155                file_path,
156                op_type,
157                parent_ops: parents,
158            })
159        })?;
160
161        Ok(ops.collect::<Result<Vec<_>, _>>()?)
162    }
163
164    pub fn store_anchor(&self, anchor: &Anchor) -> Result<()> {
165        let conn = self.conn.lock();
166        let position = bincode::serialize(&anchor.position)?;
167        let tags = serde_json::to_string(&anchor.tags)?;
168
169        conn.execute(
170            "INSERT INTO anchors (id, file_path, stable_id, position, created_at, message, tags)
171             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
172            params![
173                anchor.id.to_string(),
174                anchor.file_path,
175                anchor.stable_id,
176                position,
177                anchor.created_at.to_rfc3339(),
178                anchor.message,
179                tags,
180            ],
181        )?;
182
183        Ok(())
184    }
185}