Skip to main content

oxirs_physics/sync/
bidirectional.rs

1//! Periodic bidirectional sync orchestrator.
2//!
3//! [`BidirectionalSync`] takes a [`PhysicsState`] producer (the simulator)
4//! and an `RdfPropertyRow` consumer (the RDF graph) and drives a periodic
5//! round-trip:
6//!
7//! 1. Snapshot the simulator state at step `t`.
8//! 2. Emit a SPARQL `INSERT DATA` (or `DELETE/INSERT`) covering only the
9//!    properties that changed since the last snapshot.
10//! 3. Optionally pull external updates back from RDF and re-extract a
11//!    [`PhysicsState`] for re-initialisation.
12//!
13//! The orchestrator never touches a live RDF store directly — callers
14//! provide closures, which keeps the module easy to test and avoids
15//! mandatory `RdfStore` dependencies.
16
17use std::time::{Duration, Instant};
18
19use crate::error::{PhysicsError, PhysicsResult};
20
21use super::rdf_to_state::{RdfPropertyRow, RdfToStateExtractor, RdfToStateOutput};
22use super::state_to_rdf::{
23    state_diff, PhysicsState, StateDiff, StateGraphConfig, StateToRdfWriter,
24};
25
26/// Direction of a single sync invocation.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum SyncDirection {
29    /// Push physics state → RDF graph.
30    StateToRdf,
31    /// Pull RDF graph → physics state.
32    RdfToState,
33    /// Skipped — sync interval has not elapsed yet.
34    Skipped,
35}
36
37/// Configuration for the bidirectional orchestrator.
38#[derive(Debug, Clone)]
39pub struct BidirectionalSyncConfig {
40    /// Minimum interval between sync passes. The orchestrator returns
41    /// [`SyncDirection::Skipped`] when called before the interval elapses.
42    pub min_interval: Duration,
43    /// Whether the orchestrator should emit a *full* snapshot the first
44    /// time it observes a state. Subsequent passes always use diffs.
45    pub initial_full_snapshot: bool,
46    /// State graph configuration used by the embedded writer.
47    pub state_graph: StateGraphConfig,
48}
49
50impl Default for BidirectionalSyncConfig {
51    fn default() -> Self {
52        Self {
53            min_interval: Duration::from_millis(100),
54            initial_full_snapshot: true,
55            state_graph: StateGraphConfig::default(),
56        }
57    }
58}
59
60/// Outcome of a single push/pull pass.
61#[derive(Debug, Clone)]
62pub struct BidirectionalSyncReport {
63    /// Direction the orchestrator actually executed.
64    pub direction: SyncDirection,
65    /// Diff produced for the state-to-RDF leg (empty if direction was
66    /// [`SyncDirection::Skipped`] or [`SyncDirection::RdfToState`]).
67    pub diff: StateDiff,
68    /// Optional re-extracted state from the RDF leg.
69    pub re_extracted: Option<PhysicsState>,
70    /// SPARQL query string emitted to the consumer (when applicable).
71    pub sparql: Option<String>,
72}
73
74/// Periodic bidirectional sync orchestrator.
75pub struct BidirectionalSync {
76    config: BidirectionalSyncConfig,
77    writer: StateToRdfWriter,
78    extractor: RdfToStateExtractor,
79    /// Last full snapshot we observed; updated after every push.
80    last_snapshot: Option<PhysicsState>,
81    /// Wall-clock time of the most recent push or pull.
82    last_sync_at: Option<Instant>,
83    /// Whether we have ever pushed an initial snapshot.
84    has_pushed_initial: bool,
85}
86
87impl BidirectionalSync {
88    /// Build a new orchestrator from `config`.
89    pub fn new(config: BidirectionalSyncConfig) -> Self {
90        let writer = StateToRdfWriter::with_config(config.state_graph.clone());
91        Self {
92            config,
93            writer,
94            extractor: RdfToStateExtractor::new(),
95            last_snapshot: None,
96            last_sync_at: None,
97            has_pushed_initial: false,
98        }
99    }
100
101    /// Convenience constructor with default configuration.
102    pub fn with_defaults() -> Self {
103        Self::new(BidirectionalSyncConfig::default())
104    }
105
106    /// Read-only access to the embedded writer (e.g. for inspecting the
107    /// configured prefix in tests).
108    pub fn writer(&self) -> &StateToRdfWriter {
109        &self.writer
110    }
111
112    /// Returns `true` when the configured interval has elapsed since the
113    /// last sync.
114    pub fn ready(&self) -> bool {
115        match (self.last_sync_at, self.config.min_interval) {
116            (None, _) => true,
117            (Some(t), interval) => t.elapsed() >= interval,
118        }
119    }
120
121    /// Manually mark the orchestrator as just-synced (mainly for tests).
122    pub fn touch(&mut self) {
123        self.last_sync_at = Some(Instant::now());
124    }
125
126    /// Push the supplied `state` to RDF, returning the SPARQL query the
127    /// caller should execute. The very first call returns a full snapshot
128    /// (when `initial_full_snapshot` is true), subsequent calls return only
129    /// diffs.
130    ///
131    /// Returns [`SyncDirection::Skipped`] when the configured interval has
132    /// not yet elapsed.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`PhysicsError::Internal`] when the supplied state is
137    /// malformed (empty entity IRI).
138    pub fn push_state(&mut self, state: &PhysicsState) -> PhysicsResult<BidirectionalSyncReport> {
139        if state.entity_iri.is_empty() {
140            return Err(PhysicsError::Internal(
141                "PhysicsState.entity_iri must not be empty".to_string(),
142            ));
143        }
144        if !self.ready() {
145            return Ok(BidirectionalSyncReport {
146                direction: SyncDirection::Skipped,
147                diff: StateDiff::default(),
148                re_extracted: None,
149                sparql: None,
150            });
151        }
152
153        let initial_due = self.config.initial_full_snapshot && !self.has_pushed_initial;
154        let (sparql, diff) = if initial_due || self.last_snapshot.is_none() {
155            let q = self.writer.render_full(state);
156            self.has_pushed_initial = true;
157            (Some(q), StateDiff::default())
158        } else {
159            let prev = match self.last_snapshot.as_ref() {
160                Some(p) => p,
161                None => {
162                    // Defensive: should be unreachable thanks to the branch
163                    // condition above.
164                    return Err(PhysicsError::Internal(
165                        "missing previous snapshot for diff".to_string(),
166                    ));
167                }
168            };
169            let d = state_diff(prev, state, self.config.state_graph.tolerance);
170            let q = self.writer.render_diff(prev, state);
171            (q, d)
172        };
173
174        // Update bookkeeping
175        self.last_snapshot = Some(state.clone());
176        self.last_sync_at = Some(Instant::now());
177
178        Ok(BidirectionalSyncReport {
179            direction: SyncDirection::StateToRdf,
180            diff,
181            re_extracted: None,
182            sparql,
183        })
184    }
185
186    /// Pull the most recent property rows for `entity_iri` at `step` from
187    /// the supplied closure and re-extract a [`PhysicsState`].
188    ///
189    /// # Errors
190    ///
191    /// Bubbles up any error returned by `fetch_rows` and any failure from
192    /// the inner extractor.
193    pub fn pull_state<F>(
194        &mut self,
195        entity_iri: &str,
196        step: u64,
197        fetch_rows: F,
198    ) -> PhysicsResult<BidirectionalSyncReport>
199    where
200        F: FnOnce(&str, u64) -> PhysicsResult<Vec<RdfPropertyRow>>,
201    {
202        if !self.ready() {
203            return Ok(BidirectionalSyncReport {
204                direction: SyncDirection::Skipped,
205                diff: StateDiff::default(),
206                re_extracted: None,
207                sparql: None,
208            });
209        }
210        let rows = fetch_rows(entity_iri, step)?;
211        let RdfToStateOutput { state, skipped } =
212            self.extractor.extract(entity_iri, step, &rows)?;
213        if !skipped.is_empty() {
214            tracing::debug!(
215                "bidirectional sync skipped {} unparseable predicates: {:?}",
216                skipped.len(),
217                skipped
218            );
219        }
220        let diff = match self.last_snapshot.as_ref() {
221            Some(prev) => state_diff(prev, &state, self.config.state_graph.tolerance),
222            None => StateDiff::default(),
223        };
224
225        self.last_snapshot = Some(state.clone());
226        self.last_sync_at = Some(Instant::now());
227
228        Ok(BidirectionalSyncReport {
229            direction: SyncDirection::RdfToState,
230            diff,
231            re_extracted: Some(state),
232            sparql: None,
233        })
234    }
235
236    /// Reset internal state so the next push emits a full snapshot again.
237    pub fn reset(&mut self) {
238        self.last_snapshot = None;
239        self.last_sync_at = None;
240        self.has_pushed_initial = false;
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use std::thread::sleep;
248
249    fn entity() -> &'static str {
250        "urn:example:battery:001"
251    }
252
253    fn state_at(step: u64, voltage: f64, temperature: f64) -> PhysicsState {
254        let mut s = PhysicsState::new(entity());
255        s.step = step;
256        s.set_scalar("voltage", voltage);
257        s.set_scalar("temperature", temperature);
258        s
259    }
260
261    #[test]
262    fn first_push_emits_full_snapshot() {
263        let mut sync = BidirectionalSync::with_defaults();
264        let s = state_at(0, 3.7, 298.0);
265        let report = sync.push_state(&s).expect("push should succeed");
266        assert_eq!(report.direction, SyncDirection::StateToRdf);
267        let q = report.sparql.expect("SPARQL must be produced");
268        assert!(q.contains("INSERT DATA"));
269        assert!(q.contains("phys:State"));
270        assert!(q.contains(entity()));
271    }
272
273    #[test]
274    fn second_push_emits_diff_only() {
275        let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
276            min_interval: Duration::from_millis(0),
277            ..Default::default()
278        });
279        let s0 = state_at(0, 3.7, 298.0);
280        let r0 = sync.push_state(&s0).expect("push should succeed");
281        assert!(r0.diff.is_empty());
282
283        let s1 = state_at(1, 3.95, 298.0); // only voltage changed
284        sleep(Duration::from_millis(1));
285        let r1 = sync.push_state(&s1).expect("second push should succeed");
286        assert_eq!(r1.direction, SyncDirection::StateToRdf);
287        assert_eq!(r1.diff.changed.len(), 1);
288        assert!(r1.diff.changed.contains_key("voltage"));
289        let q = r1.sparql.expect("non-empty diff must produce SPARQL");
290        assert!(q.contains("phys:voltage"));
291        assert!(!q.contains("phys:temperature"));
292    }
293
294    #[test]
295    fn skipped_when_interval_not_elapsed() {
296        let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
297            min_interval: Duration::from_secs(10),
298            ..Default::default()
299        });
300        let s0 = state_at(0, 3.7, 298.0);
301        sync.push_state(&s0).expect("first push");
302        // Push immediately again — orchestrator must skip.
303        let r = sync.push_state(&s0).expect("second push");
304        assert_eq!(r.direction, SyncDirection::Skipped);
305        assert!(r.sparql.is_none());
306    }
307
308    #[test]
309    fn empty_diff_yields_no_sparql() {
310        let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
311            min_interval: Duration::from_millis(0),
312            ..Default::default()
313        });
314        let s = state_at(0, 3.7, 298.0);
315        sync.push_state(&s).expect("first push");
316        sleep(Duration::from_millis(1));
317        let r = sync.push_state(&s).expect("second push");
318        assert_eq!(r.direction, SyncDirection::StateToRdf);
319        assert!(r.diff.is_empty());
320        // No diff means no SPARQL.
321        assert!(r.sparql.is_none());
322    }
323
324    #[test]
325    fn pull_state_round_trips() {
326        let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
327            min_interval: Duration::from_millis(0),
328            ..Default::default()
329        });
330        let result = sync.pull_state(entity(), 0, |_, _| {
331            Ok(vec![
332                RdfPropertyRow {
333                    predicate: "voltage".to_string(),
334                    literal: "3.71".to_string(),
335                    datatype: Some("http://www.w3.org/2001/XMLSchema#double".to_string()),
336                },
337                RdfPropertyRow {
338                    predicate: "temperature".to_string(),
339                    literal: "298.15".to_string(),
340                    datatype: Some("http://www.w3.org/2001/XMLSchema#double".to_string()),
341                },
342            ])
343        });
344        let report = result.expect("pull should succeed");
345        assert_eq!(report.direction, SyncDirection::RdfToState);
346        let s = report.re_extracted.expect("must produce a state");
347        assert_eq!(s.entity_iri, entity());
348        assert_eq!(s.values.len(), 2);
349    }
350
351    #[test]
352    fn reset_clears_last_snapshot() {
353        let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
354            min_interval: Duration::from_millis(0),
355            ..Default::default()
356        });
357        let s0 = state_at(0, 3.7, 298.0);
358        sync.push_state(&s0).expect("first push");
359        sync.reset();
360        sleep(Duration::from_millis(1));
361        let r = sync.push_state(&s0).expect("post-reset push");
362        // After reset, the next push acts as a fresh initial snapshot.
363        assert!(r.sparql.expect("sparql").contains("phys:State"));
364    }
365
366    #[test]
367    fn empty_entity_iri_rejected() {
368        let mut sync = BidirectionalSync::with_defaults();
369        let mut s = PhysicsState::new("");
370        s.set_scalar("voltage", 3.7);
371        let r = sync.push_state(&s);
372        assert!(matches!(r, Err(PhysicsError::Internal(_))));
373    }
374}