Skip to main content

spec_ai/spec_ai_core/sync/
persistence_impl.rs

1//! Implementation of `SyncPersistence` for `spec-ai-config::Persistence`.
2//!
3//! This module provides a wrapper type that implements the `SyncPersistence` trait,
4//! allowing the sync engine to work with the persistence layer.
5
6use anyhow::Result;
7use crate::spec_ai_graph_sync::{ChangelogEntry, SyncPersistence, SyncedEdgeRecord, SyncedNodeRecord};
8use crate::spec_ai_knowledge_graph::{EdgeType, NodeType};
9
10use crate::spec_ai_core::persistence::Persistence;
11
12/// Wrapper around `Persistence` that implements `SyncPersistence`.
13///
14/// This wrapper is necessary due to Rust's orphan rules - we cannot implement
15/// a foreign trait (`SyncPersistence`) for a foreign type (`Persistence`) directly.
16pub struct SyncPersistenceAdapter {
17    persistence: Persistence,
18}
19
20impl SyncPersistenceAdapter {
21    /// Create a new sync persistence adapter.
22    pub fn new(persistence: Persistence) -> Self {
23        Self { persistence }
24    }
25
26    /// Get a reference to the underlying persistence.
27    pub fn persistence(&self) -> &Persistence {
28        &self.persistence
29    }
30
31    /// Get a mutable reference to the underlying persistence.
32    pub fn persistence_mut(&mut self) -> &mut Persistence {
33        &mut self.persistence
34    }
35
36    /// Consume the adapter and return the underlying persistence.
37    pub fn into_persistence(self) -> Persistence {
38        self.persistence
39    }
40}
41
42impl SyncPersistence for SyncPersistenceAdapter {
43    fn instance_id(&self) -> &str {
44        self.persistence.instance_id()
45    }
46
47    fn graph_sync_state_get(
48        &self,
49        instance_id: &str,
50        session_id: &str,
51        graph_name: &str,
52    ) -> Result<Option<String>> {
53        self.persistence
54            .graph_sync_state_get(instance_id, session_id, graph_name)
55    }
56
57    fn graph_sync_state_update(
58        &self,
59        instance_id: &str,
60        session_id: &str,
61        graph_name: &str,
62        vector_clock: &str,
63    ) -> Result<()> {
64        self.persistence
65            .graph_sync_state_update(instance_id, session_id, graph_name, vector_clock)
66    }
67
68    fn count_graph_nodes(&self, session_id: &str) -> Result<i64> {
69        self.persistence.count_graph_nodes(session_id)
70    }
71
72    fn graph_changelog_append(
73        &self,
74        session_id: &str,
75        instance_id: &str,
76        entity_type: &str,
77        entity_id: i64,
78        operation: &str,
79        vector_clock: &str,
80        data: Option<&str>,
81    ) -> Result<i64> {
82        self.persistence.graph_changelog_append(
83            session_id,
84            instance_id,
85            entity_type,
86            entity_id,
87            operation,
88            vector_clock,
89            data,
90        )
91    }
92
93    fn graph_changelog_get_since(
94        &self,
95        session_id: &str,
96        since_timestamp: &str,
97    ) -> Result<Vec<ChangelogEntry>> {
98        let entries = self
99            .persistence
100            .graph_changelog_get_since(session_id, since_timestamp)?;
101        Ok(entries
102            .into_iter()
103            .map(|e| ChangelogEntry {
104                id: e.id,
105                session_id: e.session_id,
106                instance_id: e.instance_id,
107                entity_type: e.entity_type,
108                entity_id: e.entity_id,
109                operation: e.operation,
110                vector_clock: e.vector_clock,
111                data: e.data,
112                created_at: e.created_at,
113            })
114            .collect())
115    }
116
117    fn graph_get_node_with_sync(&self, node_id: i64) -> Result<Option<SyncedNodeRecord>> {
118        let record = self.persistence.graph_get_node_with_sync(node_id)?;
119        Ok(record.map(|r| SyncedNodeRecord {
120            id: r.id,
121            session_id: r.session_id,
122            node_type: r.node_type,
123            label: r.label,
124            properties: r.properties,
125            embedding_id: r.embedding_id,
126            created_at: r.created_at,
127            updated_at: r.updated_at,
128            vector_clock: r.vector_clock,
129            last_modified_by: r.last_modified_by,
130            is_deleted: r.is_deleted,
131            sync_enabled: r.sync_enabled,
132        }))
133    }
134
135    fn graph_list_nodes_with_sync(
136        &self,
137        session_id: &str,
138        sync_enabled_only: bool,
139        include_deleted: bool,
140    ) -> Result<Vec<SyncedNodeRecord>> {
141        let records = self.persistence.graph_list_nodes_with_sync(
142            session_id,
143            sync_enabled_only,
144            include_deleted,
145        )?;
146        Ok(records
147            .into_iter()
148            .map(|r| SyncedNodeRecord {
149                id: r.id,
150                session_id: r.session_id,
151                node_type: r.node_type,
152                label: r.label,
153                properties: r.properties,
154                embedding_id: r.embedding_id,
155                created_at: r.created_at,
156                updated_at: r.updated_at,
157                vector_clock: r.vector_clock,
158                last_modified_by: r.last_modified_by,
159                is_deleted: r.is_deleted,
160                sync_enabled: r.sync_enabled,
161            })
162            .collect())
163    }
164
165    fn graph_update_node_sync_metadata(
166        &self,
167        node_id: i64,
168        vector_clock: &str,
169        last_modified_by: &str,
170        sync_enabled: bool,
171    ) -> Result<()> {
172        self.persistence.graph_update_node_sync_metadata(
173            node_id,
174            vector_clock,
175            last_modified_by,
176            sync_enabled,
177        )
178    }
179
180    fn graph_mark_node_deleted(
181        &self,
182        node_id: i64,
183        vector_clock: &str,
184        deleted_by: &str,
185    ) -> Result<()> {
186        self.persistence
187            .graph_mark_node_deleted(node_id, vector_clock, deleted_by)
188    }
189
190    fn graph_get_edge_with_sync(&self, edge_id: i64) -> Result<Option<SyncedEdgeRecord>> {
191        let record = self.persistence.graph_get_edge_with_sync(edge_id)?;
192        Ok(record.map(|r| SyncedEdgeRecord {
193            id: r.id,
194            session_id: r.session_id,
195            source_id: r.source_id,
196            target_id: r.target_id,
197            edge_type: r.edge_type,
198            predicate: r.predicate,
199            properties: r.properties,
200            weight: r.weight,
201            temporal_start: r.temporal_start,
202            temporal_end: r.temporal_end,
203            created_at: r.created_at,
204            vector_clock: r.vector_clock,
205            last_modified_by: r.last_modified_by,
206            is_deleted: r.is_deleted,
207            sync_enabled: r.sync_enabled,
208        }))
209    }
210
211    fn graph_list_edges_with_sync(
212        &self,
213        session_id: &str,
214        sync_enabled_only: bool,
215        include_deleted: bool,
216    ) -> Result<Vec<SyncedEdgeRecord>> {
217        let records = self.persistence.graph_list_edges_with_sync(
218            session_id,
219            sync_enabled_only,
220            include_deleted,
221        )?;
222        Ok(records
223            .into_iter()
224            .map(|r| SyncedEdgeRecord {
225                id: r.id,
226                session_id: r.session_id,
227                source_id: r.source_id,
228                target_id: r.target_id,
229                edge_type: r.edge_type,
230                predicate: r.predicate,
231                properties: r.properties,
232                weight: r.weight,
233                temporal_start: r.temporal_start,
234                temporal_end: r.temporal_end,
235                created_at: r.created_at,
236                vector_clock: r.vector_clock,
237                last_modified_by: r.last_modified_by,
238                is_deleted: r.is_deleted,
239                sync_enabled: r.sync_enabled,
240            })
241            .collect())
242    }
243
244    fn graph_update_edge_sync_metadata(
245        &self,
246        edge_id: i64,
247        vector_clock: &str,
248        last_modified_by: &str,
249        sync_enabled: bool,
250    ) -> Result<()> {
251        self.persistence.graph_update_edge_sync_metadata(
252            edge_id,
253            vector_clock,
254            last_modified_by,
255            sync_enabled,
256        )
257    }
258
259    fn graph_mark_edge_deleted(
260        &self,
261        edge_id: i64,
262        vector_clock: &str,
263        deleted_by: &str,
264    ) -> Result<()> {
265        self.persistence
266            .graph_mark_edge_deleted(edge_id, vector_clock, deleted_by)
267    }
268
269    fn insert_graph_node(
270        &self,
271        session_id: &str,
272        node_type: NodeType,
273        label: &str,
274        properties: &serde_json::Value,
275        embedding_id: Option<i64>,
276    ) -> Result<i64> {
277        self.persistence
278            .insert_graph_node(session_id, node_type, label, properties, embedding_id)
279    }
280
281    fn update_graph_node(&self, node_id: i64, properties: &serde_json::Value) -> Result<()> {
282        self.persistence.update_graph_node(node_id, properties)
283    }
284
285    fn insert_graph_edge(
286        &self,
287        session_id: &str,
288        source_id: i64,
289        target_id: i64,
290        edge_type: EdgeType,
291        predicate: Option<&str>,
292        properties: Option<&serde_json::Value>,
293        weight: f32,
294    ) -> Result<i64> {
295        self.persistence.insert_graph_edge(
296            session_id, source_id, target_id, edge_type, predicate, properties, weight,
297        )
298    }
299}