fcdb_graph/
lib.rs

1//! # Enishi Graph
2//!
3//! Graph data structures and operations for the Enishi database.
4//!
5//! Merkle DAG: enishi_graph -> rid_to_cid, adjacency, postings, temporal
6
7use fcdb_core::{Cid, varint, Monoid};
8use fcdb_cas::{PackCAS, PackBand};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet, BTreeMap};
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::{info, debug};
14
15/// Resource ID (RID) - unique identifier for graph nodes
16#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
17pub struct Rid(pub u64);
18
19impl Rid {
20    pub fn new(id: u64) -> Self {
21        Self(id)
22    }
23
24    pub fn as_u64(&self) -> u64 {
25        self.0
26    }
27}
28
29impl std::fmt::Debug for Rid {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        write!(f, "Rid({})", self.0)
32    }
33}
34
35impl std::fmt::Display for Rid {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        write!(f, "{}", self.0)
38    }
39}
40
41/// Edge label/type identifier
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
43pub struct LabelId(pub u32);
44
45impl LabelId {
46    pub fn new(id: u32) -> Self {
47        Self(id)
48    }
49}
50
51/// Temporal timestamp for versioning
52#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
53pub struct Timestamp(pub u64);
54
55impl Timestamp {
56    pub fn now() -> Self {
57        Self(
58            std::time::SystemTime::now()
59                .duration_since(std::time::UNIX_EPOCH)
60                .unwrap()
61                .as_micros() as u64,
62        )
63    }
64
65    pub fn as_u64(&self) -> u64 {
66        self.0
67    }
68}
69
70/// Graph edge representation
71#[derive(Clone, Debug, Serialize, Deserialize)]
72pub struct Edge {
73    pub from: Rid,
74    pub to: Rid,
75    pub label: LabelId,
76    pub properties: Cid, // CID of property data
77    pub created_at: Timestamp,
78    pub deleted_at: Option<Timestamp>,
79}
80
81/// Adjacency list entry
82#[derive(Clone, Debug, Serialize, Deserialize)]
83pub struct AdjEntry {
84    pub target: Rid,
85    pub label: LabelId,
86    pub properties: Cid,
87    pub timestamp: Timestamp,
88}
89
90/// Posting list for full-text search and analytics
91#[derive(Clone, Debug)]
92pub struct Posting {
93    pub term: String,
94    pub rid: Rid,
95    pub positions: Vec<u32>, // Term positions in the document
96    pub timestamp: Timestamp,
97}
98
99
100/// RID to CID mapping with temporal support
101#[derive(Clone, Debug)]
102pub struct RidMapping {
103    pub rid: Rid,
104    pub cid: Cid,
105    pub valid_from: Timestamp,
106    pub valid_to: Option<Timestamp>,
107}
108
109/// Graph database core structure
110pub struct GraphDB {
111    cas: Arc<RwLock<PackCAS>>,
112
113    // RID -> current CID mapping (in-memory cache)
114    rid_to_cid: Arc<RwLock<HashMap<Rid, Cid>>>,
115
116    // Temporal RID mappings (RID -> timeline of CIDs)
117    temporal_rid_mappings: Arc<RwLock<HashMap<Rid, BTreeMap<Timestamp, Cid>>>>,
118
119    // Adjacency lists (RID -> outgoing edges)
120    adjacency: Arc<RwLock<HashMap<Rid, Vec<AdjEntry>>>>,
121
122    // Reverse adjacency (RID -> incoming edges)
123    reverse_adjacency: Arc<RwLock<HashMap<Rid, Vec<AdjEntry>>>>,
124
125    // Posting lists for search
126    postings: Arc<RwLock<HashMap<String, Vec<Posting>>>>,
127
128    // Current timestamp for operations
129    current_timestamp: Arc<RwLock<Timestamp>>,
130}
131
132impl GraphDB {
133    /// Create a new graph database instance
134    pub async fn new(cas: PackCAS) -> Self {
135        Self {
136            cas: Arc::new(RwLock::new(cas)),
137            rid_to_cid: Arc::new(RwLock::new(HashMap::new())),
138            temporal_rid_mappings: Arc::new(RwLock::new(HashMap::new())),
139            adjacency: Arc::new(RwLock::new(HashMap::new())),
140            reverse_adjacency: Arc::new(RwLock::new(HashMap::new())),
141            postings: Arc::new(RwLock::new(HashMap::new())),
142            current_timestamp: Arc::new(RwLock::new(Timestamp::now())),
143        }
144    }
145
146    /// Set the current timestamp for operations (for testing/temporal control)
147    pub async fn set_timestamp(&self, ts: Timestamp) {
148        *self.current_timestamp.write().await = ts;
149    }
150
151    /// Create a new node with initial data
152    pub async fn create_node(&self, data: &[u8]) -> Result<Rid, Box<dyn std::error::Error>> {
153        let ts = *self.current_timestamp.read().await;
154
155        // Generate new RID (simplified - in real impl, use proper ID generation)
156        let rid = Rid(self.rid_to_cid.read().await.len() as u64 + 1);
157
158        // Store data in CAS
159        let cid = {
160            let mut cas = self.cas.write().await;
161            cas.put(data, 0, PackBand::Small).await?
162        };
163
164        // Update mappings
165        {
166            let mut rid_to_cid = self.rid_to_cid.write().await;
167            let mut temporal = self.temporal_rid_mappings.write().await;
168
169            rid_to_cid.insert(rid, cid);
170            temporal.entry(rid).or_insert_with(BTreeMap::new).insert(ts, cid);
171        }
172
173        // Index for search if it's text data
174        if let Ok(text) = std::str::from_utf8(data) {
175            self.index_text(rid, text, ts).await;
176        }
177
178        info!("Created node {} with CID {:?}", rid, cid);
179        Ok(rid)
180    }
181
182    /// Update a node's data
183    pub async fn update_node(&self, rid: Rid, data: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
184        let ts = *self.current_timestamp.read().await;
185
186        let cid = {
187            let mut cas = self.cas.write().await;
188            cas.put(data, 0, PackBand::Small).await?
189        };
190
191        // Update mappings
192        {
193            let mut rid_to_cid = self.rid_to_cid.write().await;
194            let mut temporal = self.temporal_rid_mappings.write().await;
195
196            rid_to_cid.insert(rid, cid);
197            temporal.entry(rid).or_insert_with(BTreeMap::new).insert(ts, cid);
198        }
199
200        // Re-index for search
201        if let Ok(text) = std::str::from_utf8(data) {
202            self.index_text(rid, text, ts).await;
203        }
204
205        debug!("Updated node {} to CID {:?}", rid, cid);
206        Ok(())
207    }
208
209    /// Get current data for a node
210    pub async fn get_node(&self, rid: Rid) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
211        let cid = {
212            let rid_to_cid = self.rid_to_cid.read().await;
213            rid_to_cid.get(&rid).cloned()
214        };
215
216        if let Some(cid) = cid {
217            let cas = self.cas.read().await;
218            Ok(Some(cas.get(&cid).await?))
219        } else {
220            Ok(None)
221        }
222    }
223
224    /// Get node data at a specific timestamp (temporal query)
225    pub async fn get_node_at(&self, rid: Rid, as_of: Timestamp) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
226        let cid = {
227            let temporal = self.temporal_rid_mappings.read().await;
228            if let Some(timeline) = temporal.get(&rid) {
229                // Find the most recent CID valid at as_of
230                timeline.range(..=as_of).next_back().map(|(_, cid)| *cid)
231            } else {
232                None
233            }
234        };
235
236        if let Some(cid) = cid {
237            let cas = self.cas.read().await;
238            Ok(Some(cas.get(&cid).await?))
239        } else {
240            Ok(None)
241        }
242    }
243
244    /// Create an edge between nodes
245    pub async fn create_edge(&self, from: Rid, to: Rid, label: LabelId, properties: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
246        let ts = *self.current_timestamp.read().await;
247
248        let prop_cid = {
249            let mut cas = self.cas.write().await;
250            cas.put(properties, 1, PackBand::Small).await?
251        };
252
253        let entry = AdjEntry {
254            target: to,
255            label,
256            properties: prop_cid,
257            timestamp: ts,
258        };
259
260        // Update adjacency lists
261        {
262            let mut adj = self.adjacency.write().await;
263            let mut rev_adj = self.reverse_adjacency.write().await;
264
265            adj.entry(from).or_insert_with(Vec::new).push(entry.clone());
266            rev_adj.entry(to).or_insert_with(Vec::new).push(AdjEntry {
267                target: from,
268                label,
269                properties: prop_cid,
270                timestamp: ts,
271            });
272        }
273
274        debug!("Created edge {} --({})--> {}", from, label.0, to);
275        Ok(())
276    }
277
278    /// Traverse graph from a starting node
279    pub async fn traverse(&self, from: Rid, labels: Option<&[LabelId]>, max_depth: usize, as_of: Option<Timestamp>)
280        -> Result<Vec<(Rid, usize)>, Box<dyn std::error::Error>>
281    {
282        let mut visited = HashSet::new();
283        let mut result = Vec::new();
284        let mut queue = vec![(from, 0)]; // (node, depth)
285
286        let adj = self.adjacency.read().await;
287
288        while let Some((current, depth)) = queue.pop() {
289            if depth > max_depth || !visited.insert(current) {
290                continue;
291            }
292
293            result.push((current, depth));
294
295            if depth < max_depth {
296                if let Some(edges) = adj.get(&current) {
297                    for edge in edges {
298                        // Check timestamp if as_of is specified
299                        if let Some(as_of) = as_of {
300                            if edge.timestamp > as_of {
301                                continue;
302                            }
303                        }
304
305                        // Check label filter
306                        if let Some(labels) = labels {
307                            if !labels.contains(&edge.label) {
308                                continue;
309                            }
310                        }
311
312                        queue.push((edge.target, depth + 1));
313                    }
314                }
315            }
316        }
317
318        Ok(result)
319    }
320
321    /// Search nodes by text content
322    pub async fn search(&self, query: &str) -> Result<Vec<(Rid, f32)>, Box<dyn std::error::Error>> {
323        let postings = self.postings.read().await;
324        let mut results = HashMap::new();
325
326        // Simple term-based search (no ranking yet)
327        if let Some(posts) = postings.get(query) {
328            for post in posts {
329                *results.entry(post.rid).or_insert(0.0) += 1.0; // Simple TF scoring
330            }
331        }
332
333        let mut sorted_results: Vec<_> = results.into_iter().collect();
334        sorted_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
335
336        Ok(sorted_results)
337    }
338
339    /// Index text content for search
340    async fn index_text(&self, rid: Rid, text: &str, timestamp: Timestamp) {
341        let words: Vec<&str> = text.split_whitespace().collect();
342        let mut postings = self.postings.write().await;
343
344        for (pos, word) in words.iter().enumerate() {
345            let posting = Posting {
346                term: word.to_lowercase(),
347                rid,
348                positions: vec![pos as u32],
349                timestamp,
350            };
351
352            postings.entry(word.to_lowercase())
353                .or_insert_with(Vec::new)
354                .push(posting);
355        }
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use tempfile::tempdir;
363
364    #[tokio::test]
365    async fn test_graph_basic_operations() {
366        let temp_dir = tempdir().unwrap();
367        let cas = PackCAS::open(temp_dir.path()).await.unwrap();
368        let graph = GraphDB::new(cas).await;
369
370        // Create nodes
371        let node1 = graph.create_node(b"Hello World").await.unwrap();
372        let node2 = graph.create_node(b"Foo Bar").await.unwrap();
373
374        // Create edge
375        let label = LabelId(1);
376        graph.create_edge(node1, node2, label, b"connects to").await.unwrap();
377
378        // Retrieve data
379        let data1 = graph.get_node(node1).await.unwrap().unwrap();
380        assert_eq!(data1, b"Hello World");
381
382        // Traverse
383        let traversal = graph.traverse(node1, Some(&[label]), 2, None).await.unwrap();
384        assert!(traversal.len() >= 2); // Should include both nodes
385
386        // Search
387        let search_results = graph.search("hello").await.unwrap();
388        assert!(!search_results.is_empty());
389    }
390
391    #[tokio::test]
392    async fn test_temporal_queries() {
393        let temp_dir = tempdir().unwrap();
394        let cas = PackCAS::open(temp_dir.path()).await.unwrap();
395        let graph = GraphDB::new(cas).await;
396
397        let node = graph.create_node(b"Version 1").await.unwrap();
398
399        // Set timestamp to future
400        let future_ts = Timestamp(1000000);
401        graph.set_timestamp(future_ts).await;
402        graph.update_node(node, b"Version 2").await.unwrap();
403
404        // Query historical version
405        let old_data = graph.get_node_at(node, Timestamp(1)).await.unwrap().unwrap();
406        assert_eq!(old_data, b"Version 1");
407
408        // Query current version
409        let new_data = graph.get_node(node).await.unwrap().unwrap();
410        assert_eq!(new_data, b"Version 2");
411    }
412}