Skip to main content

stateset_sync/
engine.rs

1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14/// Safety stop for paginated pull loops in `full_sync`.
15const MAX_PULL_PAGES: usize = 10_000;
16
17/// The sync engine orchestrates synchronization between local state and
18/// a remote sequencer.
19///
20/// This is the Rust equivalent of the JS `SyncEngine` class, providing:
21/// - Event recording to the outbox
22/// - Push (outbox -> remote) via a [`Transport`]
23/// - Pull (remote -> buffer) via a [`Transport`]
24/// - Conflict resolution during pull
25/// - Status reporting
26///
27/// # Examples
28///
29/// ```
30/// use stateset_sync::{SyncEngine, SyncConfig, SyncEvent};
31/// use serde_json::json;
32///
33/// let config = SyncConfig::new("agent-1", "tenant-1", "store-1");
34/// let mut engine = SyncEngine::new(config).expect("valid sync config");
35///
36/// let seq = engine.record(SyncEvent::new("order.created", "order", "ORD-1", json!({"total": 99})));
37/// assert!(seq.is_ok());
38/// assert_eq!(engine.pending_count(), 1);
39/// ```
40#[derive(Debug)]
41pub struct SyncEngine {
42    config: SyncConfig,
43    state: SyncState,
44    outbox: Outbox,
45    buffer: EventBuffer,
46    resolver: ConflictResolver,
47    last_pulled_sequence: u64,
48    next_pull_cursor: Option<u64>,
49    initialized: bool,
50}
51
52impl SyncEngine {
53    /// Create a new `SyncEngine` with the given configuration.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
58    /// [`SyncError::Storage`] when durable outbox initialization fails.
59    pub fn new(config: SyncConfig) -> Result<Self, SyncError> {
60        Self::try_new(config)
61    }
62
63    /// Create a `SyncEngine` with a custom conflict resolution strategy.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
68    /// [`SyncError::Storage`] when durable outbox initialization fails.
69    pub fn with_strategy(
70        config: SyncConfig,
71        strategy: ConflictStrategy,
72    ) -> Result<Self, SyncError> {
73        Self::try_with_strategy(config, strategy)
74    }
75
76    /// Fallible constructor that validates config and initializes persistence.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
81    /// [`SyncError::Storage`] when durable outbox initialization fails.
82    pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
83        Self::try_with_strategy(config, ConflictStrategy::default())
84    }
85
86    /// Fallible constructor with explicit conflict strategy.
87    ///
88    /// # Errors
89    ///
90    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
91    /// [`SyncError::Storage`] when durable outbox initialization fails.
92    pub fn try_with_strategy(
93        config: SyncConfig,
94        strategy: ConflictStrategy,
95    ) -> Result<Self, SyncError> {
96        config.validate()?;
97        let buffer_capacity = config.resolved_buffer_capacity();
98        let outbox = if let Some(path) = config.outbox_path.as_deref() {
99            Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
100        } else {
101            Outbox::new(config.resolved_outbox_capacity())
102        };
103
104        Ok(Self {
105            config,
106            state: SyncState::default(),
107            outbox,
108            buffer: EventBuffer::new(buffer_capacity),
109            resolver: ConflictResolver::new(strategy),
110            last_pulled_sequence: 0,
111            next_pull_cursor: None,
112            initialized: true,
113        })
114    }
115
116    /// Record an event into the outbox for later push.
117    ///
118    /// Returns the assigned local sequence number.
119    ///
120    /// # Errors
121    ///
122    /// Returns [`SyncError::OutboxFull`] if the outbox is at capacity.
123    pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
124        let seq = self.outbox.append(event)?;
125        self.state.local_head = seq;
126        self.state.pending_count = self.outbox.count();
127        Ok(seq)
128    }
129
130    /// Push pending events from the outbox to the remote via the given transport.
131    ///
132    /// Drains up to `batch_size` events from the outbox.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`SyncError::Transport`] if the transport operation fails.
137    pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
138        let batch_size = self.config.resolved_batch_size();
139        let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
140
141        if events.is_empty() {
142            return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
143        }
144
145        let result = transport.push_events(&events).await?;
146        let accepted = result.accepted.min(events.len());
147
148        if accepted > 0 {
149            if let Err(err) = self.outbox.drain(accepted) {
150                self.state.pending_count = self.outbox.count();
151                return Err(err);
152            }
153        }
154
155        self.state.remote_head = result.remote_head;
156        self.state.last_push = Some(Utc::now());
157        self.state.pending_count = self.outbox.count();
158
159        Ok(result)
160    }
161
162    /// Pull events from the remote sequencer into the local buffer.
163    ///
164    /// Pulled events are added to the event buffer. If conflicts are
165    /// detected (same `entity_type` + `entity_id` in both outbox and pulled),
166    /// they are resolved using the configured strategy.
167    ///
168    /// # Errors
169    ///
170    /// Returns [`SyncError::Transport`] if the transport operation fails.
171    pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
172        let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
173        let (result, next_cursor) = self.pull_since(transport, since).await?;
174        self.next_pull_cursor = next_cursor;
175        Ok(result)
176    }
177
178    async fn pull_since(
179        &mut self,
180        transport: &dyn Transport,
181        since: u64,
182    ) -> Result<(PullResult, Option<u64>), SyncError> {
183        let limit = self.config.resolved_batch_size();
184        let PullPage { result, next_cursor: transport_next_cursor } =
185            transport.pull_events_page(since, limit).await?;
186
187        // Detect and resolve conflicts between pending outbox events and pulled events
188        let pending: Vec<SyncEvent> =
189            self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
190        let mut drop_local_ids = HashSet::new();
191        let mut events_to_buffer = Vec::with_capacity(result.events.len());
192
193        for pulled_event in &result.events {
194            let mut keep_remote = true;
195
196            if let Some(local_event) = pending.iter().rev().find(|local_event| {
197                local_event.entity_type == pulled_event.entity_type
198                    && local_event.entity_id == pulled_event.entity_id
199            }) {
200                match self.resolver.resolve(local_event, pulled_event) {
201                    Resolution::KeepLocal => {
202                        keep_remote = false;
203                    }
204                    Resolution::KeepRemote => {
205                        drop_local_ids.insert(local_event.id);
206                    }
207                    Resolution::Merge(merged) => {
208                        drop_local_ids.insert(local_event.id);
209                        events_to_buffer.push(merged);
210                        keep_remote = false;
211                    }
212                }
213            }
214
215            if keep_remote {
216                events_to_buffer.push(pulled_event.clone());
217            }
218        }
219
220        if !drop_local_ids.is_empty() {
221            self.outbox.try_retain(|event| !drop_local_ids.contains(&event.id))?;
222            self.state.pending_count = self.outbox.count();
223        }
224
225        // Buffer resolved events
226        for event in events_to_buffer {
227            self.buffer.push(event);
228        }
229
230        self.state.remote_head = result.remote_head;
231        self.state.last_pull = Some(Utc::now());
232        let observed_cursor = transport_next_cursor
233            .filter(|cursor| *cursor > since)
234            .or_else(|| derive_next_cursor(since, &result.events))
235            .unwrap_or(since);
236        self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
237
238        let next_cursor = Self::resolve_next_cursor(
239            since,
240            &result.events,
241            result.has_more,
242            transport_next_cursor,
243        )?;
244
245        Ok((result, next_cursor))
246    }
247
248    fn resolve_next_cursor(
249        since: u64,
250        events: &[SyncEvent],
251        has_more: bool,
252        transport_next_cursor: Option<u64>,
253    ) -> Result<Option<u64>, SyncError> {
254        if !has_more {
255            return Ok(None);
256        }
257
258        let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
259        let Some(next_cursor) = next_cursor else {
260            return Err(SyncError::Transport(
261                "pull pagination stalled: has_more=true but no advancing cursor available"
262                    .to_string(),
263            ));
264        };
265        if next_cursor <= since {
266            return Err(SyncError::Transport(format!(
267                "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
268            )));
269        }
270        Ok(Some(next_cursor))
271    }
272
273    /// Get the current sync status.
274    #[must_use]
275    pub fn status(&self) -> SyncStatus {
276        SyncStatus {
277            initialized: self.initialized,
278            local_head: self.state.local_head,
279            remote_head: self.state.remote_head,
280            pending: self.outbox.count(),
281            lag: self.state.lag(),
282            last_push: self.state.last_push,
283            last_pull: self.state.last_pull,
284            buffered_events: self.buffer.len(),
285        }
286    }
287
288    /// Return the number of events pending in the outbox.
289    #[must_use]
290    pub fn pending_count(&self) -> usize {
291        self.outbox.count()
292    }
293
294    /// Return the number of events currently in the pull buffer.
295    #[must_use]
296    pub fn buffered_count(&self) -> usize {
297        self.buffer.len()
298    }
299
300    /// Drain all events from the pull buffer.
301    pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
302        self.buffer.drain_all()
303    }
304
305    /// Return a reference to the current sync state.
306    #[must_use]
307    pub const fn state(&self) -> &SyncState {
308        &self.state
309    }
310
311    /// Return a reference to the sync configuration.
312    #[must_use]
313    pub const fn config(&self) -> &SyncConfig {
314        &self.config
315    }
316
317    /// Return a reference to the conflict resolver.
318    #[must_use]
319    pub const fn resolver(&self) -> &ConflictResolver {
320        &self.resolver
321    }
322
323    /// Perform a full sync: push first, then pull.
324    ///
325    /// # Errors
326    ///
327    /// Returns the first error encountered during push or pull.
328    pub async fn full_sync(
329        &mut self,
330        transport: &dyn Transport,
331    ) -> Result<(PushResult, PullResult), SyncError> {
332        let push_result = self.push(transport).await?;
333        let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
334        let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
335        self.next_pull_cursor = next_cursor;
336        let mut pull_pages = 1;
337
338        while pull_result.has_more {
339            if pull_pages >= MAX_PULL_PAGES {
340                return Err(SyncError::Transport(
341                    "pull pagination exceeded safety limit".to_string(),
342                ));
343            }
344
345            since = self.next_pull_cursor.ok_or_else(|| {
346                SyncError::Transport(
347                    "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
348                )
349            })?;
350
351            let (next_page, page_cursor) = self.pull_since(transport, since).await?;
352            self.next_pull_cursor = page_cursor;
353
354            pull_result.events.extend(next_page.events);
355            pull_result.remote_head = next_page.remote_head;
356            pull_result.has_more = next_page.has_more;
357            pull_pages += 1;
358        }
359
360        self.next_pull_cursor = None;
361        Ok((push_result, pull_result))
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::transport::NullTransport;
369    use proptest::prelude::*;
370    use serde_json::json;
371    use std::sync::Arc;
372    use std::sync::Mutex;
373    use std::sync::atomic::{AtomicU64, Ordering};
374    use tempfile::tempdir;
375
376    fn make_config() -> SyncConfig {
377        SyncConfig::new("agent-1", "tenant-1", "store-1")
378    }
379
380    fn make_event(event_type: &str) -> SyncEvent {
381        SyncEvent::new(event_type, "order", "ORD-1", json!({}))
382    }
383
384    #[test]
385    fn new_engine() {
386        let engine = SyncEngine::new(make_config()).unwrap();
387        assert_eq!(engine.pending_count(), 0);
388        assert_eq!(engine.buffered_count(), 0);
389        assert!(engine.status().initialized);
390    }
391
392    #[test]
393    fn record_event() {
394        let mut engine = SyncEngine::new(make_config()).unwrap();
395        let seq = engine.record(make_event("order.created")).unwrap();
396        assert_eq!(seq, 1);
397        assert_eq!(engine.pending_count(), 1);
398        assert_eq!(engine.state().local_head, 1);
399    }
400
401    #[test]
402    fn record_multiple_events() {
403        let mut engine = SyncEngine::new(make_config()).unwrap();
404        engine.record(make_event("a")).unwrap();
405        engine.record(make_event("b")).unwrap();
406        engine.record(make_event("c")).unwrap();
407        assert_eq!(engine.pending_count(), 3);
408        assert_eq!(engine.state().local_head, 3);
409    }
410
411    #[tokio::test]
412    async fn push_with_null_transport() {
413        let mut engine = SyncEngine::new(make_config()).unwrap();
414        engine.record(make_event("a")).unwrap();
415        engine.record(make_event("b")).unwrap();
416
417        let transport = NullTransport::new();
418        let result = engine.push(&transport).await.unwrap();
419        assert_eq!(result.accepted, 2);
420        assert_eq!(engine.pending_count(), 0);
421        assert!(engine.state().last_push.is_some());
422    }
423
424    #[tokio::test]
425    async fn push_empty_outbox() {
426        let mut engine = SyncEngine::new(make_config()).unwrap();
427        let transport = NullTransport::new();
428        let result = engine.push(&transport).await.unwrap();
429        assert_eq!(result.accepted, 0);
430    }
431
432    #[tokio::test]
433    async fn pull_with_null_transport() {
434        let mut engine = SyncEngine::new(make_config()).unwrap();
435        let transport = NullTransport::new();
436        let result = engine.pull(&transport).await.unwrap();
437        assert!(result.events.is_empty());
438        assert!(!result.has_more);
439        assert!(engine.state().last_pull.is_some());
440    }
441
442    #[tokio::test]
443    async fn pull_buffers_events() {
444        /// Mock transport that returns predefined events on pull.
445        #[derive(Debug)]
446        struct MockPullTransport {
447            events: Vec<SyncEvent>,
448            head: u64,
449        }
450
451        #[async_trait::async_trait]
452        impl Transport for MockPullTransport {
453            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
454                Ok(PushResult { accepted: events.len(), remote_head: self.head })
455            }
456            async fn pull_events(
457                &self,
458                _since: u64,
459                _limit: usize,
460            ) -> Result<PullResult, SyncError> {
461                Ok(PullResult {
462                    events: self.events.clone(),
463                    remote_head: self.head,
464                    has_more: false,
465                })
466            }
467        }
468
469        let transport = MockPullTransport {
470            events: vec![
471                make_event("pulled-1").with_sequence(1),
472                make_event("pulled-2").with_sequence(2),
473            ],
474            head: 2,
475        };
476
477        let mut engine = SyncEngine::new(make_config()).unwrap();
478        let result = engine.pull(&transport).await.unwrap();
479        assert_eq!(result.events.len(), 2);
480        assert_eq!(engine.buffered_count(), 2);
481        assert_eq!(engine.state().remote_head, 2);
482    }
483
484    #[tokio::test]
485    async fn full_sync() {
486        let mut engine = SyncEngine::new(make_config()).unwrap();
487        engine.record(make_event("local")).unwrap();
488
489        let transport = NullTransport::new();
490        let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
491        assert_eq!(push_result.accepted, 1);
492        assert!(pull_result.events.is_empty());
493        assert_eq!(engine.pending_count(), 0);
494    }
495
496    #[test]
497    fn status_reporting() {
498        let mut engine = SyncEngine::new(make_config()).unwrap();
499        engine.record(make_event("a")).unwrap();
500        engine.record(make_event("b")).unwrap();
501
502        let status = engine.status();
503        assert!(status.initialized);
504        assert_eq!(status.pending, 2);
505        assert_eq!(status.local_head, 2);
506        assert_eq!(status.remote_head, 0);
507        assert_eq!(status.lag, 0);
508        assert!(status.last_push.is_none());
509    }
510
511    #[test]
512    fn drain_buffer() {
513        let mut engine = SyncEngine::new(make_config()).unwrap();
514        // Manually push to buffer via engine internals
515        engine.buffer.push(make_event("buffered"));
516        assert_eq!(engine.buffered_count(), 1);
517
518        let drained = engine.drain_buffer();
519        assert_eq!(drained.len(), 1);
520        assert_eq!(engine.buffered_count(), 0);
521    }
522
523    #[test]
524    fn engine_with_strategy() {
525        let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
526        assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
527    }
528
529    #[test]
530    fn config_accessor() {
531        let config = make_config();
532        let engine = SyncEngine::new(config).unwrap();
533        assert_eq!(engine.config().agent_id, "agent-1");
534    }
535
536    #[test]
537    fn try_new_rejects_invalid_config() {
538        let bad = SyncConfig::new("", "tenant", "store");
539        assert!(SyncEngine::try_new(bad).is_err());
540    }
541
542    #[test]
543    fn try_new_with_persistent_outbox_restores_pending_events() {
544        let dir = tempdir().unwrap();
545        let outbox_path = dir.path().join("sync-outbox.json");
546        let path_str = outbox_path.to_string_lossy().to_string();
547
548        {
549            let mut engine = SyncEngine::try_new(
550                SyncConfig::new("agent-1", "tenant-1", "store-1")
551                    .with_outbox_path(path_str.clone()),
552            )
553            .unwrap();
554            engine.record(make_event("persisted-a")).unwrap();
555            engine.record(make_event("persisted-b")).unwrap();
556            assert_eq!(engine.pending_count(), 2);
557        }
558
559        let engine = SyncEngine::try_new(
560            SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
561        )
562        .unwrap();
563        assert_eq!(engine.pending_count(), 2);
564    }
565
566    #[tokio::test]
567    async fn push_respects_batch_size() {
568        let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
569        let mut engine = SyncEngine::new(config).unwrap();
570        engine.record(make_event("a")).unwrap();
571        engine.record(make_event("b")).unwrap();
572        engine.record(make_event("c")).unwrap();
573
574        let transport = NullTransport::new();
575        let result = engine.push(&transport).await.unwrap();
576        // Should only push 2 due to batch_size
577        assert_eq!(result.accepted, 2);
578        assert_eq!(engine.pending_count(), 1);
579    }
580
581    #[tokio::test]
582    async fn push_updates_state() {
583        /// Mock transport that returns an increasing remote head.
584        #[derive(Debug)]
585        struct MockHeadTransport {
586            head: Arc<AtomicU64>,
587        }
588
589        #[async_trait::async_trait]
590        impl Transport for MockHeadTransport {
591            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
592                let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
593                    + events.len() as u64;
594                Ok(PushResult { accepted: events.len(), remote_head: new_head })
595            }
596            async fn pull_events(
597                &self,
598                _since: u64,
599                _limit: usize,
600            ) -> Result<PullResult, SyncError> {
601                Ok(PullResult {
602                    events: vec![],
603                    remote_head: self.head.load(Ordering::SeqCst),
604                    has_more: false,
605                })
606            }
607        }
608
609        let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
610
611        let mut engine = SyncEngine::new(make_config()).unwrap();
612        engine.record(make_event("a")).unwrap();
613        engine.record(make_event("b")).unwrap();
614
615        let result = engine.push(&transport).await.unwrap();
616        assert_eq!(result.remote_head, 2);
617        assert_eq!(engine.state().remote_head, 2);
618    }
619
620    #[tokio::test]
621    async fn transport_error_propagates() {
622        /// Transport that always fails.
623        #[derive(Debug)]
624        struct FailTransport;
625
626        #[async_trait::async_trait]
627        impl Transport for FailTransport {
628            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
629                Err(SyncError::Transport("network down".into()))
630            }
631            async fn pull_events(
632                &self,
633                _since: u64,
634                _limit: usize,
635            ) -> Result<PullResult, SyncError> {
636                Err(SyncError::Transport("network down".into()))
637            }
638        }
639
640        let mut engine = SyncEngine::new(make_config()).unwrap();
641        engine.record(make_event("a")).unwrap();
642
643        let transport = FailTransport;
644        let result = engine.push(&transport).await;
645        assert!(result.is_err());
646        assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
647        // Failed push must not drop local events.
648        assert_eq!(engine.pending_count(), 1);
649
650        let pull_result = engine.pull(&transport).await;
651        assert!(pull_result.is_err());
652    }
653
654    #[tokio::test]
655    async fn push_only_drains_accepted_events() {
656        /// Transport that only accepts one event from each batch.
657        #[derive(Debug)]
658        struct PartialAcceptTransport;
659
660        #[async_trait::async_trait]
661        impl Transport for PartialAcceptTransport {
662            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
663                Ok(PushResult { accepted: 1, remote_head: 1 })
664            }
665
666            async fn pull_events(
667                &self,
668                _since: u64,
669                _limit: usize,
670            ) -> Result<PullResult, SyncError> {
671                Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
672            }
673        }
674
675        let mut engine = SyncEngine::new(make_config()).unwrap();
676        engine.record(make_event("a")).unwrap();
677        engine.record(make_event("b")).unwrap();
678        engine.record(make_event("c")).unwrap();
679
680        let result = engine.push(&PartialAcceptTransport).await.unwrap();
681        assert_eq!(result.accepted, 1);
682        assert_eq!(engine.pending_count(), 2);
683    }
684
685    #[tokio::test]
686    async fn push_returns_storage_error_when_ack_persist_fails() {
687        #[derive(Debug)]
688        struct AcceptAllTransport;
689
690        #[async_trait::async_trait]
691        impl Transport for AcceptAllTransport {
692            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
693                Ok(PushResult { accepted: events.len(), remote_head: events.len() as u64 })
694            }
695
696            async fn pull_events(
697                &self,
698                _since: u64,
699                _limit: usize,
700            ) -> Result<PullResult, SyncError> {
701                Ok(PullResult { events: vec![], remote_head: 0, has_more: false })
702            }
703        }
704
705        let dir = tempfile::tempdir().unwrap();
706        let path = dir.path().join("outbox.json");
707        let config = make_config().with_outbox_path(path.to_string_lossy().into_owned());
708        let mut engine = SyncEngine::new(config).unwrap();
709        engine.record(make_event("a")).unwrap();
710
711        std::fs::remove_file(&path).unwrap();
712        std::fs::create_dir(&path).unwrap();
713
714        let err = engine.push(&AcceptAllTransport).await.unwrap_err();
715        assert!(matches!(err, SyncError::Storage(_)));
716        assert_eq!(engine.pending_count(), 1);
717        assert_eq!(engine.state().remote_head, 0);
718        assert!(engine.state().last_push.is_none());
719    }
720
721    #[tokio::test]
722    async fn pull_conflict_resolution() {
723        /// Transport that returns events conflicting with local outbox.
724        #[derive(Debug)]
725        struct ConflictTransport;
726
727        #[async_trait::async_trait]
728        impl Transport for ConflictTransport {
729            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
730                Ok(PushResult { accepted: events.len(), remote_head: 10 })
731            }
732            async fn pull_events(
733                &self,
734                _since: u64,
735                _limit: usize,
736            ) -> Result<PullResult, SyncError> {
737                // Return an event for the same entity as the pending local event
738                let remote_event =
739                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
740                        .with_sequence(5);
741                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
742            }
743        }
744
745        let mut engine =
746            SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
747        engine
748            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
749            .unwrap();
750
751        let transport = ConflictTransport;
752        let result = engine.pull(&transport).await.unwrap();
753        assert_eq!(result.events.len(), 1);
754        // RemoteWins removes conflicting local outbox events and keeps pulled event.
755        assert_eq!(engine.buffered_count(), 1);
756        assert_eq!(engine.pending_count(), 0);
757    }
758
759    #[tokio::test]
760    async fn pull_conflict_remote_wins_only_drops_latest_pending_event() {
761        #[derive(Debug)]
762        struct ConflictTransport;
763
764        #[async_trait::async_trait]
765        impl Transport for ConflictTransport {
766            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
767                Ok(PushResult { accepted: events.len(), remote_head: 10 })
768            }
769
770            async fn pull_events(
771                &self,
772                _since: u64,
773                _limit: usize,
774            ) -> Result<PullResult, SyncError> {
775                let remote_event =
776                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
777                        .with_sequence(5);
778                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
779            }
780        }
781
782        let mut engine =
783            SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
784        engine
785            .record(SyncEvent::new("order.note_added", "order", "ORD-1", json!({"note": "a"})))
786            .unwrap();
787        engine
788            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
789            .unwrap();
790
791        engine.pull(&ConflictTransport).await.unwrap();
792
793        let pending: Vec<_> = engine.outbox.peek(10).into_iter().cloned().collect();
794        assert_eq!(pending.len(), 1);
795        assert_eq!(pending[0].event_type, "order.note_added");
796        assert_eq!(engine.buffered_count(), 1);
797    }
798
799    #[tokio::test]
800    async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
801        #[derive(Debug)]
802        struct ConflictTransport;
803
804        #[async_trait::async_trait]
805        impl Transport for ConflictTransport {
806            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
807                Ok(PushResult { accepted: events.len(), remote_head: 10 })
808            }
809            async fn pull_events(
810                &self,
811                _since: u64,
812                _limit: usize,
813            ) -> Result<PullResult, SyncError> {
814                let remote_event =
815                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
816                        .with_sequence(5);
817                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
818            }
819        }
820
821        let mut engine =
822            SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
823        engine
824            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
825            .unwrap();
826
827        let result = engine.pull(&ConflictTransport).await.unwrap();
828        assert_eq!(result.events.len(), 1);
829        assert_eq!(engine.pending_count(), 1);
830        assert_eq!(engine.buffered_count(), 0);
831    }
832
833    #[tokio::test]
834    async fn full_sync_paginates_pull_until_complete() {
835        #[derive(Debug)]
836        struct PagingTransport {
837            pulls: Arc<AtomicU64>,
838            since_args: Arc<Mutex<Vec<u64>>>,
839        }
840
841        #[async_trait::async_trait]
842        impl Transport for PagingTransport {
843            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
844                Ok(PushResult { accepted: events.len(), remote_head: 0 })
845            }
846
847            async fn pull_events(
848                &self,
849                since: u64,
850                _limit: usize,
851            ) -> Result<PullResult, SyncError> {
852                self.since_args.lock().unwrap().push(since);
853                let call = self.pulls.fetch_add(1, Ordering::SeqCst);
854                if call == 0 {
855                    Ok(PullResult {
856                        events: vec![
857                            SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
858                                .with_sequence(1),
859                        ],
860                        // Simulate remote_head as global head watermark, not page cursor.
861                        remote_head: 999,
862                        has_more: true,
863                    })
864                } else {
865                    Ok(PullResult {
866                        events: vec![
867                            SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
868                                .with_sequence(2),
869                        ],
870                        remote_head: 999,
871                        has_more: false,
872                    })
873                }
874            }
875        }
876
877        let since_args = Arc::new(Mutex::new(Vec::new()));
878        let transport = PagingTransport {
879            pulls: Arc::new(AtomicU64::new(0)),
880            since_args: Arc::clone(&since_args),
881        };
882        let mut engine = SyncEngine::new(make_config()).unwrap();
883        let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
884
885        assert_eq!(pull_result.events.len(), 2);
886        assert!(!pull_result.has_more);
887        assert_eq!(engine.buffered_count(), 2);
888        assert_eq!(engine.state().remote_head, 999);
889        assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
890        assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
891    }
892
893    #[tokio::test]
894    async fn pull_cursor_does_not_advance_from_push_remote_head() {
895        #[derive(Debug)]
896        struct HeadSkewTransport {
897            since_args: Arc<Mutex<Vec<u64>>>,
898        }
899
900        #[async_trait::async_trait]
901        impl Transport for HeadSkewTransport {
902            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
903                Ok(PushResult { accepted: events.len(), remote_head: 1000 })
904            }
905
906            async fn pull_events(
907                &self,
908                since: u64,
909                _limit: usize,
910            ) -> Result<PullResult, SyncError> {
911                self.since_args.lock().unwrap().push(since);
912                Ok(PullResult {
913                    events: vec![
914                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
915                            .with_sequence(7),
916                    ],
917                    remote_head: 1000,
918                    has_more: false,
919                })
920            }
921        }
922
923        let since_args = Arc::new(Mutex::new(Vec::new()));
924        let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
925        let mut engine = SyncEngine::new(make_config()).unwrap();
926
927        engine.record(make_event("local.pending")).unwrap();
928        let (_push, pull) = engine.full_sync(&transport).await.unwrap();
929
930        assert_eq!(pull.events.len(), 1);
931        assert_eq!(&*since_args.lock().unwrap(), &[0]);
932    }
933
934    #[tokio::test]
935    async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
936        #[derive(Debug)]
937        struct StalledPagingTransport;
938
939        #[async_trait::async_trait]
940        impl Transport for StalledPagingTransport {
941            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
942                Ok(PushResult { accepted: events.len(), remote_head: 0 })
943            }
944
945            async fn pull_events(
946                &self,
947                _since: u64,
948                _limit: usize,
949            ) -> Result<PullResult, SyncError> {
950                // has_more=true but no event sequence progress, so default cursor
951                // derivation cannot safely continue.
952                Ok(PullResult {
953                    events: vec![
954                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
955                            .with_sequence(0),
956                    ],
957                    remote_head: 100,
958                    has_more: true,
959                })
960            }
961        }
962
963        let mut engine = SyncEngine::new(make_config()).unwrap();
964        let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
965        assert!(matches!(err, SyncError::Transport(_)));
966    }
967
968    proptest! {
969        #[test]
970        fn resolve_next_cursor_enforces_monotonic_progress(
971            since in 0u64..20_000,
972            transport_cursor in prop::option::of(0u64..20_000),
973            sequences in prop::collection::vec(0u64..20_000, 0..64),
974        ) {
975            let events: Vec<SyncEvent> = sequences
976                .iter()
977                .enumerate()
978                .map(|(i, seq)| {
979                    SyncEvent::new(
980                        format!("evt-{i}"),
981                        "entity",
982                        format!("id-{i}"),
983                        json!({ "s": seq }),
984                    )
985                    .with_sequence(*seq)
986                })
987                .collect();
988
989            let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
990            if let Some(cursor) = transport_cursor {
991                if cursor > since {
992                    prop_assert_eq!(result.unwrap(), Some(cursor));
993                } else {
994                    prop_assert!(matches!(result, Err(SyncError::Transport(_))));
995                }
996            } else if let Some(expected) = derive_next_cursor(since, &events) {
997                prop_assert_eq!(result.unwrap(), Some(expected));
998            } else {
999                prop_assert!(matches!(result, Err(SyncError::Transport(_))));
1000            }
1001        }
1002    }
1003
1004    proptest! {
1005        #[test]
1006        fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
1007            since in 0u64..20_000,
1008            transport_cursor in prop::option::of(0u64..20_000),
1009            sequences in prop::collection::vec(0u64..20_000, 0..64),
1010        ) {
1011            let events: Vec<SyncEvent> = sequences
1012                .iter()
1013                .enumerate()
1014                .map(|(i, seq)| {
1015                    SyncEvent::new(
1016                        format!("evt-{i}"),
1017                        "entity",
1018                        format!("id-{i}"),
1019                        json!({}),
1020                    )
1021                    .with_sequence(*seq)
1022                })
1023                .collect();
1024
1025            let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
1026                .unwrap();
1027            prop_assert_eq!(result, None);
1028        }
1029    }
1030
1031    #[test]
1032    fn engine_debug() {
1033        let engine = SyncEngine::new(make_config()).unwrap();
1034        let debug = format!("{engine:?}");
1035        assert!(debug.contains("SyncEngine"));
1036    }
1037}