Skip to main content

stateset_sync/
engine.rs

1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14/// Safety stop for paginated pull loops in `full_sync`.
15const MAX_PULL_PAGES: usize = 10_000;
16
17/// The sync engine orchestrates synchronization between local state and
18/// a remote sequencer.
19///
20/// This is the Rust equivalent of the JS `SyncEngine` class, providing:
21/// - Event recording to the outbox
22/// - Push (outbox -> remote) via a [`Transport`]
23/// - Pull (remote -> buffer) via a [`Transport`]
24/// - Conflict resolution during pull
25/// - Status reporting
26///
27/// # Examples
28///
29/// ```
30/// use stateset_sync::{SyncEngine, SyncConfig, SyncEvent};
31/// use serde_json::json;
32///
33/// let config = SyncConfig::new("agent-1", "tenant-1", "store-1");
34/// let mut engine = SyncEngine::new(config).expect("valid sync config");
35///
36/// let seq = engine.record(SyncEvent::new("order.created", "order", "ORD-1", json!({"total": 99})));
37/// assert!(seq.is_ok());
38/// assert_eq!(engine.pending_count(), 1);
39/// ```
40#[derive(Debug)]
41pub struct SyncEngine {
42    config: SyncConfig,
43    state: SyncState,
44    outbox: Outbox,
45    buffer: EventBuffer,
46    resolver: ConflictResolver,
47    last_pulled_sequence: u64,
48    next_pull_cursor: Option<u64>,
49    initialized: bool,
50}
51
52impl SyncEngine {
53    /// Create a new `SyncEngine` with the given configuration.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
58    /// [`SyncError::Storage`] when durable outbox initialization fails.
59    pub fn new(config: SyncConfig) -> Result<Self, SyncError> {
60        Self::try_new(config)
61    }
62
63    /// Create a `SyncEngine` with a custom conflict resolution strategy.
64    ///
65    /// # Errors
66    ///
67    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
68    /// [`SyncError::Storage`] when durable outbox initialization fails.
69    pub fn with_strategy(
70        config: SyncConfig,
71        strategy: ConflictStrategy,
72    ) -> Result<Self, SyncError> {
73        Self::try_with_strategy(config, strategy)
74    }
75
76    /// Fallible constructor that validates config and initializes persistence.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
81    /// [`SyncError::Storage`] when durable outbox initialization fails.
82    pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
83        Self::try_with_strategy(config, ConflictStrategy::default())
84    }
85
86    /// Fallible constructor with explicit conflict strategy.
87    ///
88    /// # Errors
89    ///
90    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
91    /// [`SyncError::Storage`] when durable outbox initialization fails.
92    pub fn try_with_strategy(
93        config: SyncConfig,
94        strategy: ConflictStrategy,
95    ) -> Result<Self, SyncError> {
96        config.validate()?;
97        let buffer_capacity = config.resolved_buffer_capacity();
98        let outbox = if let Some(path) = config.outbox_path.as_deref() {
99            Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
100        } else {
101            Outbox::new(config.resolved_outbox_capacity())
102        };
103
104        Ok(Self {
105            config,
106            state: SyncState::default(),
107            outbox,
108            buffer: EventBuffer::new(buffer_capacity),
109            resolver: ConflictResolver::new(strategy),
110            last_pulled_sequence: 0,
111            next_pull_cursor: None,
112            initialized: true,
113        })
114    }
115
116    /// Record an event into the outbox for later push.
117    ///
118    /// Returns the assigned local sequence number.
119    ///
120    /// # Errors
121    ///
122    /// Returns [`SyncError::OutboxFull`] if the outbox is at capacity.
123    pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
124        let seq = self.outbox.append(event)?;
125        self.state.local_head = seq;
126        self.state.pending_count = self.outbox.count();
127        Ok(seq)
128    }
129
130    /// Push pending events from the outbox to the remote via the given transport.
131    ///
132    /// Drains up to `batch_size` events from the outbox.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`SyncError::Transport`] if the transport operation fails.
137    pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
138        let batch_size = self.config.resolved_batch_size();
139        let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
140
141        if events.is_empty() {
142            return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
143        }
144
145        let result = transport.push_events(&events).await?;
146        let accepted = result.accepted.min(events.len());
147        if accepted > 0 {
148            let _ = self.outbox.drain(accepted);
149        }
150
151        self.state.remote_head = result.remote_head;
152        self.state.last_push = Some(Utc::now());
153        self.state.pending_count = self.outbox.count();
154
155        Ok(result)
156    }
157
158    /// Pull events from the remote sequencer into the local buffer.
159    ///
160    /// Pulled events are added to the event buffer. If conflicts are
161    /// detected (same `entity_type` + `entity_id` in both outbox and pulled),
162    /// they are resolved using the configured strategy.
163    ///
164    /// # Errors
165    ///
166    /// Returns [`SyncError::Transport`] if the transport operation fails.
167    pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
168        let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
169        let (result, next_cursor) = self.pull_since(transport, since).await?;
170        self.next_pull_cursor = next_cursor;
171        Ok(result)
172    }
173
174    async fn pull_since(
175        &mut self,
176        transport: &dyn Transport,
177        since: u64,
178    ) -> Result<(PullResult, Option<u64>), SyncError> {
179        let limit = self.config.resolved_batch_size();
180        let PullPage { result, next_cursor: transport_next_cursor } =
181            transport.pull_events_page(since, limit).await?;
182
183        // Detect and resolve conflicts between pending outbox events and pulled events
184        let pending: Vec<SyncEvent> =
185            self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
186        let mut drop_local_ids = HashSet::new();
187        let mut events_to_buffer = Vec::with_capacity(result.events.len());
188
189        for pulled_event in &result.events {
190            let mut keep_remote = true;
191
192            for local_event in &pending {
193                if local_event.entity_type == pulled_event.entity_type
194                    && local_event.entity_id == pulled_event.entity_id
195                {
196                    match self.resolver.resolve(local_event, pulled_event) {
197                        Resolution::KeepLocal => {
198                            keep_remote = false;
199                            break;
200                        }
201                        Resolution::KeepRemote => {
202                            drop_local_ids.insert(local_event.id);
203                        }
204                        Resolution::Merge(merged) => {
205                            drop_local_ids.insert(local_event.id);
206                            events_to_buffer.push(merged);
207                            keep_remote = false;
208                            break;
209                        }
210                    }
211                }
212            }
213
214            if keep_remote {
215                events_to_buffer.push(pulled_event.clone());
216            }
217        }
218
219        if !drop_local_ids.is_empty() {
220            self.outbox.retain(|event| !drop_local_ids.contains(&event.id));
221            self.state.pending_count = self.outbox.count();
222        }
223
224        // Buffer resolved events
225        for event in events_to_buffer {
226            self.buffer.push(event);
227        }
228
229        self.state.remote_head = result.remote_head;
230        self.state.last_pull = Some(Utc::now());
231        let observed_cursor = transport_next_cursor
232            .filter(|cursor| *cursor > since)
233            .or_else(|| derive_next_cursor(since, &result.events))
234            .unwrap_or(since);
235        self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
236
237        let next_cursor = Self::resolve_next_cursor(
238            since,
239            &result.events,
240            result.has_more,
241            transport_next_cursor,
242        )?;
243
244        Ok((result, next_cursor))
245    }
246
247    fn resolve_next_cursor(
248        since: u64,
249        events: &[SyncEvent],
250        has_more: bool,
251        transport_next_cursor: Option<u64>,
252    ) -> Result<Option<u64>, SyncError> {
253        if !has_more {
254            return Ok(None);
255        }
256
257        let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
258        let Some(next_cursor) = next_cursor else {
259            return Err(SyncError::Transport(
260                "pull pagination stalled: has_more=true but no advancing cursor available"
261                    .to_string(),
262            ));
263        };
264        if next_cursor <= since {
265            return Err(SyncError::Transport(format!(
266                "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
267            )));
268        }
269        Ok(Some(next_cursor))
270    }
271
272    /// Get the current sync status.
273    #[must_use]
274    pub fn status(&self) -> SyncStatus {
275        SyncStatus {
276            initialized: self.initialized,
277            local_head: self.state.local_head,
278            remote_head: self.state.remote_head,
279            pending: self.outbox.count(),
280            lag: self.state.lag(),
281            last_push: self.state.last_push,
282            last_pull: self.state.last_pull,
283            buffered_events: self.buffer.len(),
284        }
285    }
286
287    /// Return the number of events pending in the outbox.
288    #[must_use]
289    pub fn pending_count(&self) -> usize {
290        self.outbox.count()
291    }
292
293    /// Return the number of events currently in the pull buffer.
294    #[must_use]
295    pub fn buffered_count(&self) -> usize {
296        self.buffer.len()
297    }
298
299    /// Drain all events from the pull buffer.
300    pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
301        self.buffer.drain_all()
302    }
303
304    /// Return a reference to the current sync state.
305    #[must_use]
306    pub const fn state(&self) -> &SyncState {
307        &self.state
308    }
309
310    /// Return a reference to the sync configuration.
311    #[must_use]
312    pub const fn config(&self) -> &SyncConfig {
313        &self.config
314    }
315
316    /// Return a reference to the conflict resolver.
317    #[must_use]
318    pub const fn resolver(&self) -> &ConflictResolver {
319        &self.resolver
320    }
321
322    /// Perform a full sync: push first, then pull.
323    ///
324    /// # Errors
325    ///
326    /// Returns the first error encountered during push or pull.
327    pub async fn full_sync(
328        &mut self,
329        transport: &dyn Transport,
330    ) -> Result<(PushResult, PullResult), SyncError> {
331        let push_result = self.push(transport).await?;
332        let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
333        let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
334        self.next_pull_cursor = next_cursor;
335        let mut pull_pages = 1;
336
337        while pull_result.has_more {
338            if pull_pages >= MAX_PULL_PAGES {
339                return Err(SyncError::Transport(
340                    "pull pagination exceeded safety limit".to_string(),
341                ));
342            }
343
344            since = self.next_pull_cursor.ok_or_else(|| {
345                SyncError::Transport(
346                    "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
347                )
348            })?;
349
350            let (next_page, page_cursor) = self.pull_since(transport, since).await?;
351            self.next_pull_cursor = page_cursor;
352
353            pull_result.events.extend(next_page.events);
354            pull_result.remote_head = next_page.remote_head;
355            pull_result.has_more = next_page.has_more;
356            pull_pages += 1;
357        }
358
359        self.next_pull_cursor = None;
360        Ok((push_result, pull_result))
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use crate::transport::NullTransport;
368    use proptest::prelude::*;
369    use serde_json::json;
370    use std::sync::Arc;
371    use std::sync::Mutex;
372    use std::sync::atomic::{AtomicU64, Ordering};
373    use tempfile::tempdir;
374
375    fn make_config() -> SyncConfig {
376        SyncConfig::new("agent-1", "tenant-1", "store-1")
377    }
378
379    fn make_event(event_type: &str) -> SyncEvent {
380        SyncEvent::new(event_type, "order", "ORD-1", json!({}))
381    }
382
383    #[test]
384    fn new_engine() {
385        let engine = SyncEngine::new(make_config()).unwrap();
386        assert_eq!(engine.pending_count(), 0);
387        assert_eq!(engine.buffered_count(), 0);
388        assert!(engine.status().initialized);
389    }
390
391    #[test]
392    fn record_event() {
393        let mut engine = SyncEngine::new(make_config()).unwrap();
394        let seq = engine.record(make_event("order.created")).unwrap();
395        assert_eq!(seq, 1);
396        assert_eq!(engine.pending_count(), 1);
397        assert_eq!(engine.state().local_head, 1);
398    }
399
400    #[test]
401    fn record_multiple_events() {
402        let mut engine = SyncEngine::new(make_config()).unwrap();
403        engine.record(make_event("a")).unwrap();
404        engine.record(make_event("b")).unwrap();
405        engine.record(make_event("c")).unwrap();
406        assert_eq!(engine.pending_count(), 3);
407        assert_eq!(engine.state().local_head, 3);
408    }
409
410    #[tokio::test]
411    async fn push_with_null_transport() {
412        let mut engine = SyncEngine::new(make_config()).unwrap();
413        engine.record(make_event("a")).unwrap();
414        engine.record(make_event("b")).unwrap();
415
416        let transport = NullTransport::new();
417        let result = engine.push(&transport).await.unwrap();
418        assert_eq!(result.accepted, 2);
419        assert_eq!(engine.pending_count(), 0);
420        assert!(engine.state().last_push.is_some());
421    }
422
423    #[tokio::test]
424    async fn push_empty_outbox() {
425        let mut engine = SyncEngine::new(make_config()).unwrap();
426        let transport = NullTransport::new();
427        let result = engine.push(&transport).await.unwrap();
428        assert_eq!(result.accepted, 0);
429    }
430
431    #[tokio::test]
432    async fn pull_with_null_transport() {
433        let mut engine = SyncEngine::new(make_config()).unwrap();
434        let transport = NullTransport::new();
435        let result = engine.pull(&transport).await.unwrap();
436        assert!(result.events.is_empty());
437        assert!(!result.has_more);
438        assert!(engine.state().last_pull.is_some());
439    }
440
441    #[tokio::test]
442    async fn pull_buffers_events() {
443        /// Mock transport that returns predefined events on pull.
444        #[derive(Debug)]
445        struct MockPullTransport {
446            events: Vec<SyncEvent>,
447            head: u64,
448        }
449
450        #[async_trait::async_trait]
451        impl Transport for MockPullTransport {
452            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
453                Ok(PushResult { accepted: events.len(), remote_head: self.head })
454            }
455            async fn pull_events(
456                &self,
457                _since: u64,
458                _limit: usize,
459            ) -> Result<PullResult, SyncError> {
460                Ok(PullResult {
461                    events: self.events.clone(),
462                    remote_head: self.head,
463                    has_more: false,
464                })
465            }
466        }
467
468        let transport = MockPullTransport {
469            events: vec![
470                make_event("pulled-1").with_sequence(1),
471                make_event("pulled-2").with_sequence(2),
472            ],
473            head: 2,
474        };
475
476        let mut engine = SyncEngine::new(make_config()).unwrap();
477        let result = engine.pull(&transport).await.unwrap();
478        assert_eq!(result.events.len(), 2);
479        assert_eq!(engine.buffered_count(), 2);
480        assert_eq!(engine.state().remote_head, 2);
481    }
482
483    #[tokio::test]
484    async fn full_sync() {
485        let mut engine = SyncEngine::new(make_config()).unwrap();
486        engine.record(make_event("local")).unwrap();
487
488        let transport = NullTransport::new();
489        let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
490        assert_eq!(push_result.accepted, 1);
491        assert!(pull_result.events.is_empty());
492        assert_eq!(engine.pending_count(), 0);
493    }
494
495    #[test]
496    fn status_reporting() {
497        let mut engine = SyncEngine::new(make_config()).unwrap();
498        engine.record(make_event("a")).unwrap();
499        engine.record(make_event("b")).unwrap();
500
501        let status = engine.status();
502        assert!(status.initialized);
503        assert_eq!(status.pending, 2);
504        assert_eq!(status.local_head, 2);
505        assert_eq!(status.remote_head, 0);
506        assert_eq!(status.lag, 0);
507        assert!(status.last_push.is_none());
508    }
509
510    #[test]
511    fn drain_buffer() {
512        let mut engine = SyncEngine::new(make_config()).unwrap();
513        // Manually push to buffer via engine internals
514        engine.buffer.push(make_event("buffered"));
515        assert_eq!(engine.buffered_count(), 1);
516
517        let drained = engine.drain_buffer();
518        assert_eq!(drained.len(), 1);
519        assert_eq!(engine.buffered_count(), 0);
520    }
521
522    #[test]
523    fn engine_with_strategy() {
524        let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
525        assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
526    }
527
528    #[test]
529    fn config_accessor() {
530        let config = make_config();
531        let engine = SyncEngine::new(config).unwrap();
532        assert_eq!(engine.config().agent_id, "agent-1");
533    }
534
535    #[test]
536    fn try_new_rejects_invalid_config() {
537        let bad = SyncConfig::new("", "tenant", "store");
538        assert!(SyncEngine::try_new(bad).is_err());
539    }
540
541    #[test]
542    fn try_new_with_persistent_outbox_restores_pending_events() {
543        let dir = tempdir().unwrap();
544        let outbox_path = dir.path().join("sync-outbox.json");
545        let path_str = outbox_path.to_string_lossy().to_string();
546
547        {
548            let mut engine = SyncEngine::try_new(
549                SyncConfig::new("agent-1", "tenant-1", "store-1")
550                    .with_outbox_path(path_str.clone()),
551            )
552            .unwrap();
553            engine.record(make_event("persisted-a")).unwrap();
554            engine.record(make_event("persisted-b")).unwrap();
555            assert_eq!(engine.pending_count(), 2);
556        }
557
558        let engine = SyncEngine::try_new(
559            SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
560        )
561        .unwrap();
562        assert_eq!(engine.pending_count(), 2);
563    }
564
565    #[tokio::test]
566    async fn push_respects_batch_size() {
567        let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
568        let mut engine = SyncEngine::new(config).unwrap();
569        engine.record(make_event("a")).unwrap();
570        engine.record(make_event("b")).unwrap();
571        engine.record(make_event("c")).unwrap();
572
573        let transport = NullTransport::new();
574        let result = engine.push(&transport).await.unwrap();
575        // Should only push 2 due to batch_size
576        assert_eq!(result.accepted, 2);
577        assert_eq!(engine.pending_count(), 1);
578    }
579
580    #[tokio::test]
581    async fn push_updates_state() {
582        /// Mock transport that returns an increasing remote head.
583        #[derive(Debug)]
584        struct MockHeadTransport {
585            head: Arc<AtomicU64>,
586        }
587
588        #[async_trait::async_trait]
589        impl Transport for MockHeadTransport {
590            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
591                let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
592                    + events.len() as u64;
593                Ok(PushResult { accepted: events.len(), remote_head: new_head })
594            }
595            async fn pull_events(
596                &self,
597                _since: u64,
598                _limit: usize,
599            ) -> Result<PullResult, SyncError> {
600                Ok(PullResult {
601                    events: vec![],
602                    remote_head: self.head.load(Ordering::SeqCst),
603                    has_more: false,
604                })
605            }
606        }
607
608        let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
609
610        let mut engine = SyncEngine::new(make_config()).unwrap();
611        engine.record(make_event("a")).unwrap();
612        engine.record(make_event("b")).unwrap();
613
614        let result = engine.push(&transport).await.unwrap();
615        assert_eq!(result.remote_head, 2);
616        assert_eq!(engine.state().remote_head, 2);
617    }
618
619    #[tokio::test]
620    async fn transport_error_propagates() {
621        /// Transport that always fails.
622        #[derive(Debug)]
623        struct FailTransport;
624
625        #[async_trait::async_trait]
626        impl Transport for FailTransport {
627            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
628                Err(SyncError::Transport("network down".into()))
629            }
630            async fn pull_events(
631                &self,
632                _since: u64,
633                _limit: usize,
634            ) -> Result<PullResult, SyncError> {
635                Err(SyncError::Transport("network down".into()))
636            }
637        }
638
639        let mut engine = SyncEngine::new(make_config()).unwrap();
640        engine.record(make_event("a")).unwrap();
641
642        let transport = FailTransport;
643        let result = engine.push(&transport).await;
644        assert!(result.is_err());
645        assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
646        // Failed push must not drop local events.
647        assert_eq!(engine.pending_count(), 1);
648
649        let pull_result = engine.pull(&transport).await;
650        assert!(pull_result.is_err());
651    }
652
653    #[tokio::test]
654    async fn push_only_drains_accepted_events() {
655        /// Transport that only accepts one event from each batch.
656        #[derive(Debug)]
657        struct PartialAcceptTransport;
658
659        #[async_trait::async_trait]
660        impl Transport for PartialAcceptTransport {
661            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
662                Ok(PushResult { accepted: 1, remote_head: 1 })
663            }
664
665            async fn pull_events(
666                &self,
667                _since: u64,
668                _limit: usize,
669            ) -> Result<PullResult, SyncError> {
670                Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
671            }
672        }
673
674        let mut engine = SyncEngine::new(make_config()).unwrap();
675        engine.record(make_event("a")).unwrap();
676        engine.record(make_event("b")).unwrap();
677        engine.record(make_event("c")).unwrap();
678
679        let result = engine.push(&PartialAcceptTransport).await.unwrap();
680        assert_eq!(result.accepted, 1);
681        assert_eq!(engine.pending_count(), 2);
682    }
683
684    #[tokio::test]
685    async fn pull_conflict_resolution() {
686        /// Transport that returns events conflicting with local outbox.
687        #[derive(Debug)]
688        struct ConflictTransport;
689
690        #[async_trait::async_trait]
691        impl Transport for ConflictTransport {
692            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
693                Ok(PushResult { accepted: events.len(), remote_head: 10 })
694            }
695            async fn pull_events(
696                &self,
697                _since: u64,
698                _limit: usize,
699            ) -> Result<PullResult, SyncError> {
700                // Return an event for the same entity as the pending local event
701                let remote_event =
702                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
703                        .with_sequence(5);
704                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
705            }
706        }
707
708        let mut engine =
709            SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins).unwrap();
710        engine
711            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
712            .unwrap();
713
714        let transport = ConflictTransport;
715        let result = engine.pull(&transport).await.unwrap();
716        assert_eq!(result.events.len(), 1);
717        // RemoteWins removes conflicting local outbox events and keeps pulled event.
718        assert_eq!(engine.buffered_count(), 1);
719        assert_eq!(engine.pending_count(), 0);
720    }
721
722    #[tokio::test]
723    async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
724        #[derive(Debug)]
725        struct ConflictTransport;
726
727        #[async_trait::async_trait]
728        impl Transport for ConflictTransport {
729            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
730                Ok(PushResult { accepted: events.len(), remote_head: 10 })
731            }
732            async fn pull_events(
733                &self,
734                _since: u64,
735                _limit: usize,
736            ) -> Result<PullResult, SyncError> {
737                let remote_event =
738                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
739                        .with_sequence(5);
740                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
741            }
742        }
743
744        let mut engine =
745            SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins).unwrap();
746        engine
747            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
748            .unwrap();
749
750        let result = engine.pull(&ConflictTransport).await.unwrap();
751        assert_eq!(result.events.len(), 1);
752        assert_eq!(engine.pending_count(), 1);
753        assert_eq!(engine.buffered_count(), 0);
754    }
755
756    #[tokio::test]
757    async fn full_sync_paginates_pull_until_complete() {
758        #[derive(Debug)]
759        struct PagingTransport {
760            pulls: Arc<AtomicU64>,
761            since_args: Arc<Mutex<Vec<u64>>>,
762        }
763
764        #[async_trait::async_trait]
765        impl Transport for PagingTransport {
766            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
767                Ok(PushResult { accepted: events.len(), remote_head: 0 })
768            }
769
770            async fn pull_events(
771                &self,
772                since: u64,
773                _limit: usize,
774            ) -> Result<PullResult, SyncError> {
775                self.since_args.lock().unwrap().push(since);
776                let call = self.pulls.fetch_add(1, Ordering::SeqCst);
777                if call == 0 {
778                    Ok(PullResult {
779                        events: vec![
780                            SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
781                                .with_sequence(1),
782                        ],
783                        // Simulate remote_head as global head watermark, not page cursor.
784                        remote_head: 999,
785                        has_more: true,
786                    })
787                } else {
788                    Ok(PullResult {
789                        events: vec![
790                            SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
791                                .with_sequence(2),
792                        ],
793                        remote_head: 999,
794                        has_more: false,
795                    })
796                }
797            }
798        }
799
800        let since_args = Arc::new(Mutex::new(Vec::new()));
801        let transport = PagingTransport {
802            pulls: Arc::new(AtomicU64::new(0)),
803            since_args: Arc::clone(&since_args),
804        };
805        let mut engine = SyncEngine::new(make_config()).unwrap();
806        let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
807
808        assert_eq!(pull_result.events.len(), 2);
809        assert!(!pull_result.has_more);
810        assert_eq!(engine.buffered_count(), 2);
811        assert_eq!(engine.state().remote_head, 999);
812        assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
813        assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
814    }
815
816    #[tokio::test]
817    async fn pull_cursor_does_not_advance_from_push_remote_head() {
818        #[derive(Debug)]
819        struct HeadSkewTransport {
820            since_args: Arc<Mutex<Vec<u64>>>,
821        }
822
823        #[async_trait::async_trait]
824        impl Transport for HeadSkewTransport {
825            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
826                Ok(PushResult { accepted: events.len(), remote_head: 1000 })
827            }
828
829            async fn pull_events(
830                &self,
831                since: u64,
832                _limit: usize,
833            ) -> Result<PullResult, SyncError> {
834                self.since_args.lock().unwrap().push(since);
835                Ok(PullResult {
836                    events: vec![
837                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
838                            .with_sequence(7),
839                    ],
840                    remote_head: 1000,
841                    has_more: false,
842                })
843            }
844        }
845
846        let since_args = Arc::new(Mutex::new(Vec::new()));
847        let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
848        let mut engine = SyncEngine::new(make_config()).unwrap();
849
850        engine.record(make_event("local.pending")).unwrap();
851        let (_push, pull) = engine.full_sync(&transport).await.unwrap();
852
853        assert_eq!(pull.events.len(), 1);
854        assert_eq!(&*since_args.lock().unwrap(), &[0]);
855    }
856
857    #[tokio::test]
858    async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
859        #[derive(Debug)]
860        struct StalledPagingTransport;
861
862        #[async_trait::async_trait]
863        impl Transport for StalledPagingTransport {
864            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
865                Ok(PushResult { accepted: events.len(), remote_head: 0 })
866            }
867
868            async fn pull_events(
869                &self,
870                _since: u64,
871                _limit: usize,
872            ) -> Result<PullResult, SyncError> {
873                // has_more=true but no event sequence progress, so default cursor
874                // derivation cannot safely continue.
875                Ok(PullResult {
876                    events: vec![
877                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
878                            .with_sequence(0),
879                    ],
880                    remote_head: 100,
881                    has_more: true,
882                })
883            }
884        }
885
886        let mut engine = SyncEngine::new(make_config()).unwrap();
887        let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
888        assert!(matches!(err, SyncError::Transport(_)));
889    }
890
891    proptest! {
892        #[test]
893        fn resolve_next_cursor_enforces_monotonic_progress(
894            since in 0u64..20_000,
895            transport_cursor in prop::option::of(0u64..20_000),
896            sequences in prop::collection::vec(0u64..20_000, 0..64),
897        ) {
898            let events: Vec<SyncEvent> = sequences
899                .iter()
900                .enumerate()
901                .map(|(i, seq)| {
902                    SyncEvent::new(
903                        format!("evt-{i}"),
904                        "entity",
905                        format!("id-{i}"),
906                        json!({ "s": seq }),
907                    )
908                    .with_sequence(*seq)
909                })
910                .collect();
911
912            let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
913            if let Some(cursor) = transport_cursor {
914                if cursor > since {
915                    prop_assert_eq!(result.unwrap(), Some(cursor));
916                } else {
917                    prop_assert!(matches!(result, Err(SyncError::Transport(_))));
918                }
919            } else if let Some(expected) = derive_next_cursor(since, &events) {
920                prop_assert_eq!(result.unwrap(), Some(expected));
921            } else {
922                prop_assert!(matches!(result, Err(SyncError::Transport(_))));
923            }
924        }
925    }
926
927    proptest! {
928        #[test]
929        fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
930            since in 0u64..20_000,
931            transport_cursor in prop::option::of(0u64..20_000),
932            sequences in prop::collection::vec(0u64..20_000, 0..64),
933        ) {
934            let events: Vec<SyncEvent> = sequences
935                .iter()
936                .enumerate()
937                .map(|(i, seq)| {
938                    SyncEvent::new(
939                        format!("evt-{i}"),
940                        "entity",
941                        format!("id-{i}"),
942                        json!({}),
943                    )
944                    .with_sequence(*seq)
945                })
946                .collect();
947
948            let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
949                .unwrap();
950            prop_assert_eq!(result, None);
951        }
952    }
953
954    #[test]
955    fn engine_debug() {
956        let engine = SyncEngine::new(make_config()).unwrap();
957        let debug = format!("{engine:?}");
958        assert!(debug.contains("SyncEngine"));
959    }
960}