1use fcdb_core::{Cid, varint, Monoid};
8use fcdb_cas::{PackCAS, PackBand};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet, BTreeMap};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::{info, debug};
14
15#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
17pub struct Rid(pub u64);
18
19impl Rid {
20 pub fn new(id: u64) -> Self {
21 Self(id)
22 }
23
24 pub fn as_u64(&self) -> u64 {
25 self.0
26 }
27}
28
29impl std::fmt::Debug for Rid {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "Rid({})", self.0)
32 }
33}
34
35impl std::fmt::Display for Rid {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 write!(f, "{}", self.0)
38 }
39}
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
43pub struct LabelId(pub u32);
44
45impl LabelId {
46 pub fn new(id: u32) -> Self {
47 Self(id)
48 }
49}
50
51#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub struct Timestamp(pub u64);
54
55impl Timestamp {
56 pub fn now() -> Self {
57 Self(
58 std::time::SystemTime::now()
59 .duration_since(std::time::UNIX_EPOCH)
60 .unwrap()
61 .as_micros() as u64,
62 )
63 }
64
65 pub fn as_u64(&self) -> u64 {
66 self.0
67 }
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
72pub struct Edge {
73 pub from: Rid,
74 pub to: Rid,
75 pub label: LabelId,
76 pub properties: Cid, pub created_at: Timestamp,
78 pub deleted_at: Option<Timestamp>,
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct AdjEntry {
84 pub target: Rid,
85 pub label: LabelId,
86 pub properties: Cid,
87 pub timestamp: Timestamp,
88}
89
90#[derive(Clone, Debug)]
92pub struct Posting {
93 pub term: String,
94 pub rid: Rid,
95 pub positions: Vec<u32>, pub timestamp: Timestamp,
97}
98
99
100#[derive(Clone, Debug)]
102pub struct RidMapping {
103 pub rid: Rid,
104 pub cid: Cid,
105 pub valid_from: Timestamp,
106 pub valid_to: Option<Timestamp>,
107}
108
109pub struct GraphDB {
111 cas: Arc<RwLock<PackCAS>>,
112
113 rid_to_cid: Arc<RwLock<HashMap<Rid, Cid>>>,
115
116 temporal_rid_mappings: Arc<RwLock<HashMap<Rid, BTreeMap<Timestamp, Cid>>>>,
118
119 adjacency: Arc<RwLock<HashMap<Rid, Vec<AdjEntry>>>>,
121
122 reverse_adjacency: Arc<RwLock<HashMap<Rid, Vec<AdjEntry>>>>,
124
125 postings: Arc<RwLock<HashMap<String, Vec<Posting>>>>,
127
128 current_timestamp: Arc<RwLock<Timestamp>>,
130}
131
132impl GraphDB {
133 pub async fn new(cas: PackCAS) -> Self {
135 Self {
136 cas: Arc::new(RwLock::new(cas)),
137 rid_to_cid: Arc::new(RwLock::new(HashMap::new())),
138 temporal_rid_mappings: Arc::new(RwLock::new(HashMap::new())),
139 adjacency: Arc::new(RwLock::new(HashMap::new())),
140 reverse_adjacency: Arc::new(RwLock::new(HashMap::new())),
141 postings: Arc::new(RwLock::new(HashMap::new())),
142 current_timestamp: Arc::new(RwLock::new(Timestamp::now())),
143 }
144 }
145
146 pub async fn set_timestamp(&self, ts: Timestamp) {
148 *self.current_timestamp.write().await = ts;
149 }
150
151 pub async fn create_node(&self, data: &[u8]) -> Result<Rid, Box<dyn std::error::Error>> {
153 let ts = *self.current_timestamp.read().await;
154
155 let rid = Rid(self.rid_to_cid.read().await.len() as u64 + 1);
157
158 let cid = {
160 let mut cas = self.cas.write().await;
161 cas.put(data, 0, PackBand::Small).await?
162 };
163
164 {
166 let mut rid_to_cid = self.rid_to_cid.write().await;
167 let mut temporal = self.temporal_rid_mappings.write().await;
168
169 rid_to_cid.insert(rid, cid);
170 temporal.entry(rid).or_insert_with(BTreeMap::new).insert(ts, cid);
171 }
172
173 if let Ok(text) = std::str::from_utf8(data) {
175 self.index_text(rid, text, ts).await;
176 }
177
178 info!("Created node {} with CID {:?}", rid, cid);
179 Ok(rid)
180 }
181
182 pub async fn update_node(&self, rid: Rid, data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
184 let ts = *self.current_timestamp.read().await;
185
186 let cid = {
187 let mut cas = self.cas.write().await;
188 cas.put(data, 0, PackBand::Small).await?
189 };
190
191 {
193 let mut rid_to_cid = self.rid_to_cid.write().await;
194 let mut temporal = self.temporal_rid_mappings.write().await;
195
196 rid_to_cid.insert(rid, cid);
197 temporal.entry(rid).or_insert_with(BTreeMap::new).insert(ts, cid);
198 }
199
200 if let Ok(text) = std::str::from_utf8(data) {
202 self.index_text(rid, text, ts).await;
203 }
204
205 debug!("Updated node {} to CID {:?}", rid, cid);
206 Ok(())
207 }
208
209 pub async fn get_node(&self, rid: Rid) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
211 let cid = {
212 let rid_to_cid = self.rid_to_cid.read().await;
213 rid_to_cid.get(&rid).cloned()
214 };
215
216 if let Some(cid) = cid {
217 let cas = self.cas.read().await;
218 Ok(Some(cas.get(&cid).await?))
219 } else {
220 Ok(None)
221 }
222 }
223
224 pub async fn get_node_at(&self, rid: Rid, as_of: Timestamp) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
226 let cid = {
227 let temporal = self.temporal_rid_mappings.read().await;
228 if let Some(timeline) = temporal.get(&rid) {
229 timeline.range(..=as_of).next_back().map(|(_, cid)| *cid)
231 } else {
232 None
233 }
234 };
235
236 if let Some(cid) = cid {
237 let cas = self.cas.read().await;
238 Ok(Some(cas.get(&cid).await?))
239 } else {
240 Ok(None)
241 }
242 }
243
244 pub async fn create_edge(&self, from: Rid, to: Rid, label: LabelId, properties: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
246 let ts = *self.current_timestamp.read().await;
247
248 let prop_cid = {
249 let mut cas = self.cas.write().await;
250 cas.put(properties, 1, PackBand::Small).await?
251 };
252
253 let entry = AdjEntry {
254 target: to,
255 label,
256 properties: prop_cid,
257 timestamp: ts,
258 };
259
260 {
262 let mut adj = self.adjacency.write().await;
263 let mut rev_adj = self.reverse_adjacency.write().await;
264
265 adj.entry(from).or_insert_with(Vec::new).push(entry.clone());
266 rev_adj.entry(to).or_insert_with(Vec::new).push(AdjEntry {
267 target: from,
268 label,
269 properties: prop_cid,
270 timestamp: ts,
271 });
272 }
273
274 debug!("Created edge {} --({})--> {}", from, label.0, to);
275 Ok(())
276 }
277
278 pub async fn traverse(&self, from: Rid, labels: Option<&[LabelId]>, max_depth: usize, as_of: Option<Timestamp>)
280 -> Result<Vec<(Rid, usize)>, Box<dyn std::error::Error>>
281 {
282 let mut visited = HashSet::new();
283 let mut result = Vec::new();
284 let mut queue = vec![(from, 0)]; let adj = self.adjacency.read().await;
287
288 while let Some((current, depth)) = queue.pop() {
289 if depth > max_depth || !visited.insert(current) {
290 continue;
291 }
292
293 result.push((current, depth));
294
295 if depth < max_depth {
296 if let Some(edges) = adj.get(¤t) {
297 for edge in edges {
298 if let Some(as_of) = as_of {
300 if edge.timestamp > as_of {
301 continue;
302 }
303 }
304
305 if let Some(labels) = labels {
307 if !labels.contains(&edge.label) {
308 continue;
309 }
310 }
311
312 queue.push((edge.target, depth + 1));
313 }
314 }
315 }
316 }
317
318 Ok(result)
319 }
320
321 pub async fn search(&self, query: &str) -> Result<Vec<(Rid, f32)>, Box<dyn std::error::Error>> {
323 let postings = self.postings.read().await;
324 let mut results = HashMap::new();
325
326 if let Some(posts) = postings.get(query) {
328 for post in posts {
329 *results.entry(post.rid).or_insert(0.0) += 1.0; }
331 }
332
333 let mut sorted_results: Vec<_> = results.into_iter().collect();
334 sorted_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
335
336 Ok(sorted_results)
337 }
338
339 async fn index_text(&self, rid: Rid, text: &str, timestamp: Timestamp) {
341 let words: Vec<&str> = text.split_whitespace().collect();
342 let mut postings = self.postings.write().await;
343
344 for (pos, word) in words.iter().enumerate() {
345 let posting = Posting {
346 term: word.to_lowercase(),
347 rid,
348 positions: vec![pos as u32],
349 timestamp,
350 };
351
352 postings.entry(word.to_lowercase())
353 .or_insert_with(Vec::new)
354 .push(posting);
355 }
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use tempfile::tempdir;
363
364 #[tokio::test]
365 async fn test_graph_basic_operations() {
366 let temp_dir = tempdir().unwrap();
367 let cas = PackCAS::open(temp_dir.path()).await.unwrap();
368 let graph = GraphDB::new(cas).await;
369
370 let node1 = graph.create_node(b"Hello World").await.unwrap();
372 let node2 = graph.create_node(b"Foo Bar").await.unwrap();
373
374 let label = LabelId(1);
376 graph.create_edge(node1, node2, label, b"connects to").await.unwrap();
377
378 let data1 = graph.get_node(node1).await.unwrap().unwrap();
380 assert_eq!(data1, b"Hello World");
381
382 let traversal = graph.traverse(node1, Some(&[label]), 2, None).await.unwrap();
384 assert!(traversal.len() >= 2); let search_results = graph.search("hello").await.unwrap();
388 assert!(!search_results.is_empty());
389 }
390
391 #[tokio::test]
392 async fn test_temporal_queries() {
393 let temp_dir = tempdir().unwrap();
394 let cas = PackCAS::open(temp_dir.path()).await.unwrap();
395 let graph = GraphDB::new(cas).await;
396
397 let node = graph.create_node(b"Version 1").await.unwrap();
398
399 let future_ts = Timestamp(1000000);
401 graph.set_timestamp(future_ts).await;
402 graph.update_node(node, b"Version 2").await.unwrap();
403
404 let old_data = graph.get_node_at(node, Timestamp(1)).await.unwrap().unwrap();
406 assert_eq!(old_data, b"Version 1");
407
408 let new_data = graph.get_node(node).await.unwrap().unwrap();
410 assert_eq!(new_data, b"Version 2");
411 }
412}