1use std::time::{Duration, Instant};
18
19use crate::error::{PhysicsError, PhysicsResult};
20
21use super::rdf_to_state::{RdfPropertyRow, RdfToStateExtractor, RdfToStateOutput};
22use super::state_to_rdf::{
23 state_diff, PhysicsState, StateDiff, StateGraphConfig, StateToRdfWriter,
24};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum SyncDirection {
29 StateToRdf,
31 RdfToState,
33 Skipped,
35}
36
37#[derive(Debug, Clone)]
39pub struct BidirectionalSyncConfig {
40 pub min_interval: Duration,
43 pub initial_full_snapshot: bool,
46 pub state_graph: StateGraphConfig,
48}
49
50impl Default for BidirectionalSyncConfig {
51 fn default() -> Self {
52 Self {
53 min_interval: Duration::from_millis(100),
54 initial_full_snapshot: true,
55 state_graph: StateGraphConfig::default(),
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct BidirectionalSyncReport {
63 pub direction: SyncDirection,
65 pub diff: StateDiff,
68 pub re_extracted: Option<PhysicsState>,
70 pub sparql: Option<String>,
72}
73
74pub struct BidirectionalSync {
76 config: BidirectionalSyncConfig,
77 writer: StateToRdfWriter,
78 extractor: RdfToStateExtractor,
79 last_snapshot: Option<PhysicsState>,
81 last_sync_at: Option<Instant>,
83 has_pushed_initial: bool,
85}
86
87impl BidirectionalSync {
88 pub fn new(config: BidirectionalSyncConfig) -> Self {
90 let writer = StateToRdfWriter::with_config(config.state_graph.clone());
91 Self {
92 config,
93 writer,
94 extractor: RdfToStateExtractor::new(),
95 last_snapshot: None,
96 last_sync_at: None,
97 has_pushed_initial: false,
98 }
99 }
100
101 pub fn with_defaults() -> Self {
103 Self::new(BidirectionalSyncConfig::default())
104 }
105
106 pub fn writer(&self) -> &StateToRdfWriter {
109 &self.writer
110 }
111
112 pub fn ready(&self) -> bool {
115 match (self.last_sync_at, self.config.min_interval) {
116 (None, _) => true,
117 (Some(t), interval) => t.elapsed() >= interval,
118 }
119 }
120
121 pub fn touch(&mut self) {
123 self.last_sync_at = Some(Instant::now());
124 }
125
126 pub fn push_state(&mut self, state: &PhysicsState) -> PhysicsResult<BidirectionalSyncReport> {
139 if state.entity_iri.is_empty() {
140 return Err(PhysicsError::Internal(
141 "PhysicsState.entity_iri must not be empty".to_string(),
142 ));
143 }
144 if !self.ready() {
145 return Ok(BidirectionalSyncReport {
146 direction: SyncDirection::Skipped,
147 diff: StateDiff::default(),
148 re_extracted: None,
149 sparql: None,
150 });
151 }
152
153 let initial_due = self.config.initial_full_snapshot && !self.has_pushed_initial;
154 let (sparql, diff) = if initial_due || self.last_snapshot.is_none() {
155 let q = self.writer.render_full(state);
156 self.has_pushed_initial = true;
157 (Some(q), StateDiff::default())
158 } else {
159 let prev = match self.last_snapshot.as_ref() {
160 Some(p) => p,
161 None => {
162 return Err(PhysicsError::Internal(
165 "missing previous snapshot for diff".to_string(),
166 ));
167 }
168 };
169 let d = state_diff(prev, state, self.config.state_graph.tolerance);
170 let q = self.writer.render_diff(prev, state);
171 (q, d)
172 };
173
174 self.last_snapshot = Some(state.clone());
176 self.last_sync_at = Some(Instant::now());
177
178 Ok(BidirectionalSyncReport {
179 direction: SyncDirection::StateToRdf,
180 diff,
181 re_extracted: None,
182 sparql,
183 })
184 }
185
186 pub fn pull_state<F>(
194 &mut self,
195 entity_iri: &str,
196 step: u64,
197 fetch_rows: F,
198 ) -> PhysicsResult<BidirectionalSyncReport>
199 where
200 F: FnOnce(&str, u64) -> PhysicsResult<Vec<RdfPropertyRow>>,
201 {
202 if !self.ready() {
203 return Ok(BidirectionalSyncReport {
204 direction: SyncDirection::Skipped,
205 diff: StateDiff::default(),
206 re_extracted: None,
207 sparql: None,
208 });
209 }
210 let rows = fetch_rows(entity_iri, step)?;
211 let RdfToStateOutput { state, skipped } =
212 self.extractor.extract(entity_iri, step, &rows)?;
213 if !skipped.is_empty() {
214 tracing::debug!(
215 "bidirectional sync skipped {} unparseable predicates: {:?}",
216 skipped.len(),
217 skipped
218 );
219 }
220 let diff = match self.last_snapshot.as_ref() {
221 Some(prev) => state_diff(prev, &state, self.config.state_graph.tolerance),
222 None => StateDiff::default(),
223 };
224
225 self.last_snapshot = Some(state.clone());
226 self.last_sync_at = Some(Instant::now());
227
228 Ok(BidirectionalSyncReport {
229 direction: SyncDirection::RdfToState,
230 diff,
231 re_extracted: Some(state),
232 sparql: None,
233 })
234 }
235
236 pub fn reset(&mut self) {
238 self.last_snapshot = None;
239 self.last_sync_at = None;
240 self.has_pushed_initial = false;
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use std::thread::sleep;
248
249 fn entity() -> &'static str {
250 "urn:example:battery:001"
251 }
252
253 fn state_at(step: u64, voltage: f64, temperature: f64) -> PhysicsState {
254 let mut s = PhysicsState::new(entity());
255 s.step = step;
256 s.set_scalar("voltage", voltage);
257 s.set_scalar("temperature", temperature);
258 s
259 }
260
261 #[test]
262 fn first_push_emits_full_snapshot() {
263 let mut sync = BidirectionalSync::with_defaults();
264 let s = state_at(0, 3.7, 298.0);
265 let report = sync.push_state(&s).expect("push should succeed");
266 assert_eq!(report.direction, SyncDirection::StateToRdf);
267 let q = report.sparql.expect("SPARQL must be produced");
268 assert!(q.contains("INSERT DATA"));
269 assert!(q.contains("phys:State"));
270 assert!(q.contains(entity()));
271 }
272
273 #[test]
274 fn second_push_emits_diff_only() {
275 let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
276 min_interval: Duration::from_millis(0),
277 ..Default::default()
278 });
279 let s0 = state_at(0, 3.7, 298.0);
280 let r0 = sync.push_state(&s0).expect("push should succeed");
281 assert!(r0.diff.is_empty());
282
283 let s1 = state_at(1, 3.95, 298.0); sleep(Duration::from_millis(1));
285 let r1 = sync.push_state(&s1).expect("second push should succeed");
286 assert_eq!(r1.direction, SyncDirection::StateToRdf);
287 assert_eq!(r1.diff.changed.len(), 1);
288 assert!(r1.diff.changed.contains_key("voltage"));
289 let q = r1.sparql.expect("non-empty diff must produce SPARQL");
290 assert!(q.contains("phys:voltage"));
291 assert!(!q.contains("phys:temperature"));
292 }
293
294 #[test]
295 fn skipped_when_interval_not_elapsed() {
296 let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
297 min_interval: Duration::from_secs(10),
298 ..Default::default()
299 });
300 let s0 = state_at(0, 3.7, 298.0);
301 sync.push_state(&s0).expect("first push");
302 let r = sync.push_state(&s0).expect("second push");
304 assert_eq!(r.direction, SyncDirection::Skipped);
305 assert!(r.sparql.is_none());
306 }
307
308 #[test]
309 fn empty_diff_yields_no_sparql() {
310 let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
311 min_interval: Duration::from_millis(0),
312 ..Default::default()
313 });
314 let s = state_at(0, 3.7, 298.0);
315 sync.push_state(&s).expect("first push");
316 sleep(Duration::from_millis(1));
317 let r = sync.push_state(&s).expect("second push");
318 assert_eq!(r.direction, SyncDirection::StateToRdf);
319 assert!(r.diff.is_empty());
320 assert!(r.sparql.is_none());
322 }
323
324 #[test]
325 fn pull_state_round_trips() {
326 let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
327 min_interval: Duration::from_millis(0),
328 ..Default::default()
329 });
330 let result = sync.pull_state(entity(), 0, |_, _| {
331 Ok(vec![
332 RdfPropertyRow {
333 predicate: "voltage".to_string(),
334 literal: "3.71".to_string(),
335 datatype: Some("http://www.w3.org/2001/XMLSchema#double".to_string()),
336 },
337 RdfPropertyRow {
338 predicate: "temperature".to_string(),
339 literal: "298.15".to_string(),
340 datatype: Some("http://www.w3.org/2001/XMLSchema#double".to_string()),
341 },
342 ])
343 });
344 let report = result.expect("pull should succeed");
345 assert_eq!(report.direction, SyncDirection::RdfToState);
346 let s = report.re_extracted.expect("must produce a state");
347 assert_eq!(s.entity_iri, entity());
348 assert_eq!(s.values.len(), 2);
349 }
350
351 #[test]
352 fn reset_clears_last_snapshot() {
353 let mut sync = BidirectionalSync::new(BidirectionalSyncConfig {
354 min_interval: Duration::from_millis(0),
355 ..Default::default()
356 });
357 let s0 = state_at(0, 3.7, 298.0);
358 sync.push_state(&s0).expect("first push");
359 sync.reset();
360 sleep(Duration::from_millis(1));
361 let r = sync.push_state(&s0).expect("post-reset push");
362 assert!(r.sparql.expect("sparql").contains("phys:State"));
364 }
365
366 #[test]
367 fn empty_entity_iri_rejected() {
368 let mut sync = BidirectionalSync::with_defaults();
369 let mut s = PhysicsState::new("");
370 s.set_scalar("voltage", 3.7);
371 let r = sync.push_state(&s);
372 assert!(matches!(r, Err(PhysicsError::Internal(_))));
373 }
374}