Skip to main content

stateset_sync/
engine.rs

1use std::collections::HashSet;
2
3use chrono::Utc;
4
5use crate::buffer::EventBuffer;
6use crate::config::SyncConfig;
7use crate::conflict::{ConflictResolver, ConflictStrategy, Resolution};
8use crate::error::SyncError;
9use crate::event::SyncEvent;
10use crate::outbox::Outbox;
11use crate::state::{SyncState, SyncStatus};
12use crate::transport::{PullPage, PullResult, PushResult, Transport, derive_next_cursor};
13
14/// Safety stop for paginated pull loops in `full_sync`.
15const MAX_PULL_PAGES: usize = 10_000;
16
17/// The sync engine orchestrates synchronization between local state and
18/// a remote sequencer.
19///
20/// This is the Rust equivalent of the JS `SyncEngine` class, providing:
21/// - Event recording to the outbox
22/// - Push (outbox -> remote) via a [`Transport`]
23/// - Pull (remote -> buffer) via a [`Transport`]
24/// - Conflict resolution during pull
25/// - Status reporting
26///
27/// # Examples
28///
29/// ```
30/// use stateset_sync::{SyncEngine, SyncConfig, SyncEvent};
31/// use serde_json::json;
32///
33/// let config = SyncConfig::new("agent-1", "tenant-1", "store-1");
34/// let mut engine = SyncEngine::new(config);
35///
36/// let seq = engine.record(SyncEvent::new("order.created", "order", "ORD-1", json!({"total": 99})));
37/// assert!(seq.is_ok());
38/// assert_eq!(engine.pending_count(), 1);
39/// ```
40#[derive(Debug)]
41pub struct SyncEngine {
42    config: SyncConfig,
43    state: SyncState,
44    outbox: Outbox,
45    buffer: EventBuffer,
46    resolver: ConflictResolver,
47    last_pulled_sequence: u64,
48    next_pull_cursor: Option<u64>,
49    initialized: bool,
50}
51
52impl SyncEngine {
53    /// Create a new `SyncEngine` with the given configuration.
54    #[must_use]
55    pub fn new(config: SyncConfig) -> Self {
56        Self::try_new(config).unwrap_or_else(|err| panic!("invalid sync configuration: {err}"))
57    }
58
59    /// Create a `SyncEngine` with a custom conflict resolution strategy.
60    #[must_use]
61    pub fn with_strategy(config: SyncConfig, strategy: ConflictStrategy) -> Self {
62        Self::try_with_strategy(config, strategy)
63            .unwrap_or_else(|err| panic!("invalid sync configuration: {err}"))
64    }
65
66    /// Fallible constructor that validates config and initializes persistence.
67    ///
68    /// # Errors
69    ///
70    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
71    /// [`SyncError::Storage`] when durable outbox initialization fails.
72    pub fn try_new(config: SyncConfig) -> Result<Self, SyncError> {
73        Self::try_with_strategy(config, ConflictStrategy::default())
74    }
75
76    /// Fallible constructor with explicit conflict strategy.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SyncError::InvalidConfig`] for invalid settings or
81    /// [`SyncError::Storage`] when durable outbox initialization fails.
82    pub fn try_with_strategy(
83        config: SyncConfig,
84        strategy: ConflictStrategy,
85    ) -> Result<Self, SyncError> {
86        config.validate()?;
87        let buffer_capacity = config.resolved_buffer_capacity();
88        let outbox = if let Some(path) = config.outbox_path.as_deref() {
89            Outbox::with_persistence(config.resolved_outbox_capacity(), path)?
90        } else {
91            Outbox::new(config.resolved_outbox_capacity())
92        };
93
94        Ok(Self {
95            config,
96            state: SyncState::default(),
97            outbox,
98            buffer: EventBuffer::new(buffer_capacity),
99            resolver: ConflictResolver::new(strategy),
100            last_pulled_sequence: 0,
101            next_pull_cursor: None,
102            initialized: true,
103        })
104    }
105
106    /// Record an event into the outbox for later push.
107    ///
108    /// Returns the assigned local sequence number.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`SyncError::OutboxFull`] if the outbox is at capacity.
113    pub fn record(&mut self, event: SyncEvent) -> Result<u64, SyncError> {
114        let seq = self.outbox.append(event)?;
115        self.state.local_head = seq;
116        self.state.pending_count = self.outbox.count();
117        Ok(seq)
118    }
119
120    /// Push pending events from the outbox to the remote via the given transport.
121    ///
122    /// Drains up to `batch_size` events from the outbox.
123    ///
124    /// # Errors
125    ///
126    /// Returns [`SyncError::Transport`] if the transport operation fails.
127    pub async fn push(&mut self, transport: &dyn Transport) -> Result<PushResult, SyncError> {
128        let batch_size = self.config.resolved_batch_size();
129        let events: Vec<SyncEvent> = self.outbox.peek(batch_size).into_iter().cloned().collect();
130
131        if events.is_empty() {
132            return Ok(PushResult { accepted: 0, remote_head: self.state.remote_head });
133        }
134
135        let result = transport.push_events(&events).await?;
136        let accepted = result.accepted.min(events.len());
137        if accepted > 0 {
138            let _ = self.outbox.drain(accepted);
139        }
140
141        self.state.remote_head = result.remote_head;
142        self.state.last_push = Some(Utc::now());
143        self.state.pending_count = self.outbox.count();
144
145        Ok(result)
146    }
147
148    /// Pull events from the remote sequencer into the local buffer.
149    ///
150    /// Pulled events are added to the event buffer. If conflicts are
151    /// detected (same `entity_type` + `entity_id` in both outbox and pulled),
152    /// they are resolved using the configured strategy.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`SyncError::Transport`] if the transport operation fails.
157    pub async fn pull(&mut self, transport: &dyn Transport) -> Result<PullResult, SyncError> {
158        let since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
159        let (result, next_cursor) = self.pull_since(transport, since).await?;
160        self.next_pull_cursor = next_cursor;
161        Ok(result)
162    }
163
164    async fn pull_since(
165        &mut self,
166        transport: &dyn Transport,
167        since: u64,
168    ) -> Result<(PullResult, Option<u64>), SyncError> {
169        let limit = self.config.resolved_batch_size();
170        let PullPage { result, next_cursor: transport_next_cursor } =
171            transport.pull_events_page(since, limit).await?;
172
173        // Detect and resolve conflicts between pending outbox events and pulled events
174        let pending: Vec<SyncEvent> =
175            self.outbox.peek(self.outbox.count()).into_iter().cloned().collect();
176        let mut drop_local_ids = HashSet::new();
177        let mut events_to_buffer = Vec::with_capacity(result.events.len());
178
179        for pulled_event in &result.events {
180            let mut keep_remote = true;
181
182            for local_event in &pending {
183                if local_event.entity_type == pulled_event.entity_type
184                    && local_event.entity_id == pulled_event.entity_id
185                {
186                    match self.resolver.resolve(local_event, pulled_event) {
187                        Resolution::KeepLocal => {
188                            keep_remote = false;
189                            break;
190                        }
191                        Resolution::KeepRemote => {
192                            drop_local_ids.insert(local_event.id);
193                        }
194                        Resolution::Merge(merged) => {
195                            drop_local_ids.insert(local_event.id);
196                            events_to_buffer.push(merged);
197                            keep_remote = false;
198                            break;
199                        }
200                    }
201                }
202            }
203
204            if keep_remote {
205                events_to_buffer.push(pulled_event.clone());
206            }
207        }
208
209        if !drop_local_ids.is_empty() {
210            self.outbox.retain(|event| !drop_local_ids.contains(&event.id));
211            self.state.pending_count = self.outbox.count();
212        }
213
214        // Buffer resolved events
215        for event in events_to_buffer {
216            self.buffer.push(event);
217        }
218
219        self.state.remote_head = result.remote_head;
220        self.state.last_pull = Some(Utc::now());
221        let observed_cursor = transport_next_cursor
222            .filter(|cursor| *cursor > since)
223            .or_else(|| derive_next_cursor(since, &result.events))
224            .unwrap_or(since);
225        self.last_pulled_sequence = self.last_pulled_sequence.max(observed_cursor);
226
227        let next_cursor = Self::resolve_next_cursor(
228            since,
229            &result.events,
230            result.has_more,
231            transport_next_cursor,
232        )?;
233
234        Ok((result, next_cursor))
235    }
236
237    fn resolve_next_cursor(
238        since: u64,
239        events: &[SyncEvent],
240        has_more: bool,
241        transport_next_cursor: Option<u64>,
242    ) -> Result<Option<u64>, SyncError> {
243        if !has_more {
244            return Ok(None);
245        }
246
247        let next_cursor = transport_next_cursor.or_else(|| derive_next_cursor(since, events));
248        let Some(next_cursor) = next_cursor else {
249            return Err(SyncError::Transport(
250                "pull pagination stalled: has_more=true but no advancing cursor available"
251                    .to_string(),
252            ));
253        };
254        if next_cursor <= since {
255            return Err(SyncError::Transport(format!(
256                "pull pagination cursor did not advance (since={since}, next_cursor={next_cursor})"
257            )));
258        }
259        Ok(Some(next_cursor))
260    }
261
262    /// Get the current sync status.
263    #[must_use]
264    pub fn status(&self) -> SyncStatus {
265        SyncStatus {
266            initialized: self.initialized,
267            local_head: self.state.local_head,
268            remote_head: self.state.remote_head,
269            pending: self.outbox.count(),
270            lag: self.state.lag(),
271            last_push: self.state.last_push,
272            last_pull: self.state.last_pull,
273            buffered_events: self.buffer.len(),
274        }
275    }
276
277    /// Return the number of events pending in the outbox.
278    #[must_use]
279    pub fn pending_count(&self) -> usize {
280        self.outbox.count()
281    }
282
283    /// Return the number of events currently in the pull buffer.
284    #[must_use]
285    pub fn buffered_count(&self) -> usize {
286        self.buffer.len()
287    }
288
289    /// Drain all events from the pull buffer.
290    pub fn drain_buffer(&mut self) -> Vec<SyncEvent> {
291        self.buffer.drain_all()
292    }
293
294    /// Return a reference to the current sync state.
295    #[must_use]
296    pub const fn state(&self) -> &SyncState {
297        &self.state
298    }
299
300    /// Return a reference to the sync configuration.
301    #[must_use]
302    pub const fn config(&self) -> &SyncConfig {
303        &self.config
304    }
305
306    /// Return a reference to the conflict resolver.
307    #[must_use]
308    pub const fn resolver(&self) -> &ConflictResolver {
309        &self.resolver
310    }
311
312    /// Perform a full sync: push first, then pull.
313    ///
314    /// # Errors
315    ///
316    /// Returns the first error encountered during push or pull.
317    pub async fn full_sync(
318        &mut self,
319        transport: &dyn Transport,
320    ) -> Result<(PushResult, PullResult), SyncError> {
321        let push_result = self.push(transport).await?;
322        let mut since = self.next_pull_cursor.unwrap_or(self.last_pulled_sequence);
323        let (mut pull_result, next_cursor) = self.pull_since(transport, since).await?;
324        self.next_pull_cursor = next_cursor;
325        let mut pull_pages = 1;
326
327        while pull_result.has_more {
328            if pull_pages >= MAX_PULL_PAGES {
329                return Err(SyncError::Transport(
330                    "pull pagination exceeded safety limit".to_string(),
331                ));
332            }
333
334            since = self.next_pull_cursor.ok_or_else(|| {
335                SyncError::Transport(
336                    "pull pagination stalled: has_more=true but no continuation cursor".to_string(),
337                )
338            })?;
339
340            let (next_page, page_cursor) = self.pull_since(transport, since).await?;
341            self.next_pull_cursor = page_cursor;
342
343            pull_result.events.extend(next_page.events);
344            pull_result.remote_head = next_page.remote_head;
345            pull_result.has_more = next_page.has_more;
346            pull_pages += 1;
347        }
348
349        self.next_pull_cursor = None;
350        Ok((push_result, pull_result))
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use crate::transport::NullTransport;
358    use proptest::prelude::*;
359    use serde_json::json;
360    use std::sync::Arc;
361    use std::sync::Mutex;
362    use std::sync::atomic::{AtomicU64, Ordering};
363    use tempfile::tempdir;
364
365    fn make_config() -> SyncConfig {
366        SyncConfig::new("agent-1", "tenant-1", "store-1")
367    }
368
369    fn make_event(event_type: &str) -> SyncEvent {
370        SyncEvent::new(event_type, "order", "ORD-1", json!({}))
371    }
372
373    #[test]
374    fn new_engine() {
375        let engine = SyncEngine::new(make_config());
376        assert_eq!(engine.pending_count(), 0);
377        assert_eq!(engine.buffered_count(), 0);
378        assert!(engine.status().initialized);
379    }
380
381    #[test]
382    fn record_event() {
383        let mut engine = SyncEngine::new(make_config());
384        let seq = engine.record(make_event("order.created")).unwrap();
385        assert_eq!(seq, 1);
386        assert_eq!(engine.pending_count(), 1);
387        assert_eq!(engine.state().local_head, 1);
388    }
389
390    #[test]
391    fn record_multiple_events() {
392        let mut engine = SyncEngine::new(make_config());
393        engine.record(make_event("a")).unwrap();
394        engine.record(make_event("b")).unwrap();
395        engine.record(make_event("c")).unwrap();
396        assert_eq!(engine.pending_count(), 3);
397        assert_eq!(engine.state().local_head, 3);
398    }
399
400    #[tokio::test]
401    async fn push_with_null_transport() {
402        let mut engine = SyncEngine::new(make_config());
403        engine.record(make_event("a")).unwrap();
404        engine.record(make_event("b")).unwrap();
405
406        let transport = NullTransport::new();
407        let result = engine.push(&transport).await.unwrap();
408        assert_eq!(result.accepted, 2);
409        assert_eq!(engine.pending_count(), 0);
410        assert!(engine.state().last_push.is_some());
411    }
412
413    #[tokio::test]
414    async fn push_empty_outbox() {
415        let mut engine = SyncEngine::new(make_config());
416        let transport = NullTransport::new();
417        let result = engine.push(&transport).await.unwrap();
418        assert_eq!(result.accepted, 0);
419    }
420
421    #[tokio::test]
422    async fn pull_with_null_transport() {
423        let mut engine = SyncEngine::new(make_config());
424        let transport = NullTransport::new();
425        let result = engine.pull(&transport).await.unwrap();
426        assert!(result.events.is_empty());
427        assert!(!result.has_more);
428        assert!(engine.state().last_pull.is_some());
429    }
430
431    #[tokio::test]
432    async fn pull_buffers_events() {
433        /// Mock transport that returns predefined events on pull.
434        #[derive(Debug)]
435        struct MockPullTransport {
436            events: Vec<SyncEvent>,
437            head: u64,
438        }
439
440        #[async_trait::async_trait]
441        impl Transport for MockPullTransport {
442            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
443                Ok(PushResult { accepted: events.len(), remote_head: self.head })
444            }
445            async fn pull_events(
446                &self,
447                _since: u64,
448                _limit: usize,
449            ) -> Result<PullResult, SyncError> {
450                Ok(PullResult {
451                    events: self.events.clone(),
452                    remote_head: self.head,
453                    has_more: false,
454                })
455            }
456        }
457
458        let transport = MockPullTransport {
459            events: vec![
460                make_event("pulled-1").with_sequence(1),
461                make_event("pulled-2").with_sequence(2),
462            ],
463            head: 2,
464        };
465
466        let mut engine = SyncEngine::new(make_config());
467        let result = engine.pull(&transport).await.unwrap();
468        assert_eq!(result.events.len(), 2);
469        assert_eq!(engine.buffered_count(), 2);
470        assert_eq!(engine.state().remote_head, 2);
471    }
472
473    #[tokio::test]
474    async fn full_sync() {
475        let mut engine = SyncEngine::new(make_config());
476        engine.record(make_event("local")).unwrap();
477
478        let transport = NullTransport::new();
479        let (push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
480        assert_eq!(push_result.accepted, 1);
481        assert!(pull_result.events.is_empty());
482        assert_eq!(engine.pending_count(), 0);
483    }
484
485    #[test]
486    fn status_reporting() {
487        let mut engine = SyncEngine::new(make_config());
488        engine.record(make_event("a")).unwrap();
489        engine.record(make_event("b")).unwrap();
490
491        let status = engine.status();
492        assert!(status.initialized);
493        assert_eq!(status.pending, 2);
494        assert_eq!(status.local_head, 2);
495        assert_eq!(status.remote_head, 0);
496        assert_eq!(status.lag, 0);
497        assert!(status.last_push.is_none());
498    }
499
500    #[test]
501    fn drain_buffer() {
502        let mut engine = SyncEngine::new(make_config());
503        // Manually push to buffer via engine internals
504        engine.buffer.push(make_event("buffered"));
505        assert_eq!(engine.buffered_count(), 1);
506
507        let drained = engine.drain_buffer();
508        assert_eq!(drained.len(), 1);
509        assert_eq!(engine.buffered_count(), 0);
510    }
511
512    #[test]
513    fn engine_with_strategy() {
514        let engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins);
515        assert_eq!(engine.resolver().strategy(), ConflictStrategy::LocalWins);
516    }
517
518    #[test]
519    fn config_accessor() {
520        let config = make_config();
521        let engine = SyncEngine::new(config);
522        assert_eq!(engine.config().agent_id, "agent-1");
523    }
524
525    #[test]
526    fn try_new_rejects_invalid_config() {
527        let bad = SyncConfig::new("", "tenant", "store");
528        assert!(SyncEngine::try_new(bad).is_err());
529    }
530
531    #[test]
532    fn try_new_with_persistent_outbox_restores_pending_events() {
533        let dir = tempdir().unwrap();
534        let outbox_path = dir.path().join("sync-outbox.json");
535        let path_str = outbox_path.to_string_lossy().to_string();
536
537        {
538            let mut engine = SyncEngine::try_new(
539                SyncConfig::new("agent-1", "tenant-1", "store-1")
540                    .with_outbox_path(path_str.clone()),
541            )
542            .unwrap();
543            engine.record(make_event("persisted-a")).unwrap();
544            engine.record(make_event("persisted-b")).unwrap();
545            assert_eq!(engine.pending_count(), 2);
546        }
547
548        let engine = SyncEngine::try_new(
549            SyncConfig::new("agent-1", "tenant-1", "store-1").with_outbox_path(path_str),
550        )
551        .unwrap();
552        assert_eq!(engine.pending_count(), 2);
553    }
554
555    #[tokio::test]
556    async fn push_respects_batch_size() {
557        let config = SyncConfig::new("agent-1", "tenant-1", "store-1").with_batch_size(2);
558        let mut engine = SyncEngine::new(config);
559        engine.record(make_event("a")).unwrap();
560        engine.record(make_event("b")).unwrap();
561        engine.record(make_event("c")).unwrap();
562
563        let transport = NullTransport::new();
564        let result = engine.push(&transport).await.unwrap();
565        // Should only push 2 due to batch_size
566        assert_eq!(result.accepted, 2);
567        assert_eq!(engine.pending_count(), 1);
568    }
569
570    #[tokio::test]
571    async fn push_updates_state() {
572        /// Mock transport that returns an increasing remote head.
573        #[derive(Debug)]
574        struct MockHeadTransport {
575            head: Arc<AtomicU64>,
576        }
577
578        #[async_trait::async_trait]
579        impl Transport for MockHeadTransport {
580            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
581                let new_head = self.head.fetch_add(events.len() as u64, Ordering::SeqCst)
582                    + events.len() as u64;
583                Ok(PushResult { accepted: events.len(), remote_head: new_head })
584            }
585            async fn pull_events(
586                &self,
587                _since: u64,
588                _limit: usize,
589            ) -> Result<PullResult, SyncError> {
590                Ok(PullResult {
591                    events: vec![],
592                    remote_head: self.head.load(Ordering::SeqCst),
593                    has_more: false,
594                })
595            }
596        }
597
598        let transport = MockHeadTransport { head: Arc::new(AtomicU64::new(0)) };
599
600        let mut engine = SyncEngine::new(make_config());
601        engine.record(make_event("a")).unwrap();
602        engine.record(make_event("b")).unwrap();
603
604        let result = engine.push(&transport).await.unwrap();
605        assert_eq!(result.remote_head, 2);
606        assert_eq!(engine.state().remote_head, 2);
607    }
608
609    #[tokio::test]
610    async fn transport_error_propagates() {
611        /// Transport that always fails.
612        #[derive(Debug)]
613        struct FailTransport;
614
615        #[async_trait::async_trait]
616        impl Transport for FailTransport {
617            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
618                Err(SyncError::Transport("network down".into()))
619            }
620            async fn pull_events(
621                &self,
622                _since: u64,
623                _limit: usize,
624            ) -> Result<PullResult, SyncError> {
625                Err(SyncError::Transport("network down".into()))
626            }
627        }
628
629        let mut engine = SyncEngine::new(make_config());
630        engine.record(make_event("a")).unwrap();
631
632        let transport = FailTransport;
633        let result = engine.push(&transport).await;
634        assert!(result.is_err());
635        assert!(matches!(result.unwrap_err(), SyncError::Transport(_)));
636        // Failed push must not drop local events.
637        assert_eq!(engine.pending_count(), 1);
638
639        let pull_result = engine.pull(&transport).await;
640        assert!(pull_result.is_err());
641    }
642
643    #[tokio::test]
644    async fn push_only_drains_accepted_events() {
645        /// Transport that only accepts one event from each batch.
646        #[derive(Debug)]
647        struct PartialAcceptTransport;
648
649        #[async_trait::async_trait]
650        impl Transport for PartialAcceptTransport {
651            async fn push_events(&self, _events: &[SyncEvent]) -> Result<PushResult, SyncError> {
652                Ok(PushResult { accepted: 1, remote_head: 1 })
653            }
654
655            async fn pull_events(
656                &self,
657                _since: u64,
658                _limit: usize,
659            ) -> Result<PullResult, SyncError> {
660                Ok(PullResult { events: vec![], remote_head: 1, has_more: false })
661            }
662        }
663
664        let mut engine = SyncEngine::new(make_config());
665        engine.record(make_event("a")).unwrap();
666        engine.record(make_event("b")).unwrap();
667        engine.record(make_event("c")).unwrap();
668
669        let result = engine.push(&PartialAcceptTransport).await.unwrap();
670        assert_eq!(result.accepted, 1);
671        assert_eq!(engine.pending_count(), 2);
672    }
673
674    #[tokio::test]
675    async fn pull_conflict_resolution() {
676        /// Transport that returns events conflicting with local outbox.
677        #[derive(Debug)]
678        struct ConflictTransport;
679
680        #[async_trait::async_trait]
681        impl Transport for ConflictTransport {
682            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
683                Ok(PushResult { accepted: events.len(), remote_head: 10 })
684            }
685            async fn pull_events(
686                &self,
687                _since: u64,
688                _limit: usize,
689            ) -> Result<PullResult, SyncError> {
690                // Return an event for the same entity as the pending local event
691                let remote_event =
692                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
693                        .with_sequence(5);
694                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
695            }
696        }
697
698        let mut engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::RemoteWins);
699        engine
700            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
701            .unwrap();
702
703        let transport = ConflictTransport;
704        let result = engine.pull(&transport).await.unwrap();
705        assert_eq!(result.events.len(), 1);
706        // RemoteWins removes conflicting local outbox events and keeps pulled event.
707        assert_eq!(engine.buffered_count(), 1);
708        assert_eq!(engine.pending_count(), 0);
709    }
710
711    #[tokio::test]
712    async fn pull_conflict_local_wins_keeps_pending_and_skips_remote() {
713        #[derive(Debug)]
714        struct ConflictTransport;
715
716        #[async_trait::async_trait]
717        impl Transport for ConflictTransport {
718            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
719                Ok(PushResult { accepted: events.len(), remote_head: 10 })
720            }
721            async fn pull_events(
722                &self,
723                _since: u64,
724                _limit: usize,
725            ) -> Result<PullResult, SyncError> {
726                let remote_event =
727                    SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "remote"}))
728                        .with_sequence(5);
729                Ok(PullResult { events: vec![remote_event], remote_head: 5, has_more: false })
730            }
731        }
732
733        let mut engine = SyncEngine::with_strategy(make_config(), ConflictStrategy::LocalWins);
734        engine
735            .record(SyncEvent::new("order.updated", "order", "ORD-1", json!({"status": "local"})))
736            .unwrap();
737
738        let result = engine.pull(&ConflictTransport).await.unwrap();
739        assert_eq!(result.events.len(), 1);
740        assert_eq!(engine.pending_count(), 1);
741        assert_eq!(engine.buffered_count(), 0);
742    }
743
744    #[tokio::test]
745    async fn full_sync_paginates_pull_until_complete() {
746        #[derive(Debug)]
747        struct PagingTransport {
748            pulls: Arc<AtomicU64>,
749            since_args: Arc<Mutex<Vec<u64>>>,
750        }
751
752        #[async_trait::async_trait]
753        impl Transport for PagingTransport {
754            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
755                Ok(PushResult { accepted: events.len(), remote_head: 0 })
756            }
757
758            async fn pull_events(
759                &self,
760                since: u64,
761                _limit: usize,
762            ) -> Result<PullResult, SyncError> {
763                self.since_args.lock().unwrap().push(since);
764                let call = self.pulls.fetch_add(1, Ordering::SeqCst);
765                if call == 0 {
766                    Ok(PullResult {
767                        events: vec![
768                            SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
769                                .with_sequence(1),
770                        ],
771                        // Simulate remote_head as global head watermark, not page cursor.
772                        remote_head: 999,
773                        has_more: true,
774                    })
775                } else {
776                    Ok(PullResult {
777                        events: vec![
778                            SyncEvent::new("order.updated", "order", "ORD-2", json!({}))
779                                .with_sequence(2),
780                        ],
781                        remote_head: 999,
782                        has_more: false,
783                    })
784                }
785            }
786        }
787
788        let since_args = Arc::new(Mutex::new(Vec::new()));
789        let transport = PagingTransport {
790            pulls: Arc::new(AtomicU64::new(0)),
791            since_args: Arc::clone(&since_args),
792        };
793        let mut engine = SyncEngine::new(make_config());
794        let (_push_result, pull_result) = engine.full_sync(&transport).await.unwrap();
795
796        assert_eq!(pull_result.events.len(), 2);
797        assert!(!pull_result.has_more);
798        assert_eq!(engine.buffered_count(), 2);
799        assert_eq!(engine.state().remote_head, 999);
800        assert_eq!(transport.pulls.load(Ordering::SeqCst), 2);
801        assert_eq!(&*since_args.lock().unwrap(), &[0, 1]);
802    }
803
804    #[tokio::test]
805    async fn pull_cursor_does_not_advance_from_push_remote_head() {
806        #[derive(Debug)]
807        struct HeadSkewTransport {
808            since_args: Arc<Mutex<Vec<u64>>>,
809        }
810
811        #[async_trait::async_trait]
812        impl Transport for HeadSkewTransport {
813            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
814                Ok(PushResult { accepted: events.len(), remote_head: 1000 })
815            }
816
817            async fn pull_events(
818                &self,
819                since: u64,
820                _limit: usize,
821            ) -> Result<PullResult, SyncError> {
822                self.since_args.lock().unwrap().push(since);
823                Ok(PullResult {
824                    events: vec![
825                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
826                            .with_sequence(7),
827                    ],
828                    remote_head: 1000,
829                    has_more: false,
830                })
831            }
832        }
833
834        let since_args = Arc::new(Mutex::new(Vec::new()));
835        let transport = HeadSkewTransport { since_args: Arc::clone(&since_args) };
836        let mut engine = SyncEngine::new(make_config());
837
838        engine.record(make_event("local.pending")).unwrap();
839        let (_push, pull) = engine.full_sync(&transport).await.unwrap();
840
841        assert_eq!(pull.events.len(), 1);
842        assert_eq!(&*since_args.lock().unwrap(), &[0]);
843    }
844
845    #[tokio::test]
846    async fn pull_errors_when_has_more_but_cursor_cannot_advance() {
847        #[derive(Debug)]
848        struct StalledPagingTransport;
849
850        #[async_trait::async_trait]
851        impl Transport for StalledPagingTransport {
852            async fn push_events(&self, events: &[SyncEvent]) -> Result<PushResult, SyncError> {
853                Ok(PushResult { accepted: events.len(), remote_head: 0 })
854            }
855
856            async fn pull_events(
857                &self,
858                _since: u64,
859                _limit: usize,
860            ) -> Result<PullResult, SyncError> {
861                // has_more=true but no event sequence progress, so default cursor
862                // derivation cannot safely continue.
863                Ok(PullResult {
864                    events: vec![
865                        SyncEvent::new("order.updated", "order", "ORD-1", json!({}))
866                            .with_sequence(0),
867                    ],
868                    remote_head: 100,
869                    has_more: true,
870                })
871            }
872        }
873
874        let mut engine = SyncEngine::new(make_config());
875        let err = engine.pull(&StalledPagingTransport).await.unwrap_err();
876        assert!(matches!(err, SyncError::Transport(_)));
877    }
878
879    proptest! {
880        #[test]
881        fn resolve_next_cursor_enforces_monotonic_progress(
882            since in 0u64..20_000,
883            transport_cursor in prop::option::of(0u64..20_000),
884            sequences in prop::collection::vec(0u64..20_000, 0..64),
885        ) {
886            let events: Vec<SyncEvent> = sequences
887                .iter()
888                .enumerate()
889                .map(|(i, seq)| {
890                    SyncEvent::new(
891                        format!("evt-{i}"),
892                        "entity",
893                        format!("id-{i}"),
894                        json!({ "s": seq }),
895                    )
896                    .with_sequence(*seq)
897                })
898                .collect();
899
900            let result = SyncEngine::resolve_next_cursor(since, &events, true, transport_cursor);
901            if let Some(cursor) = transport_cursor {
902                if cursor > since {
903                    prop_assert_eq!(result.unwrap(), Some(cursor));
904                } else {
905                    prop_assert!(matches!(result, Err(SyncError::Transport(_))));
906                }
907            } else if let Some(expected) = derive_next_cursor(since, &events) {
908                prop_assert_eq!(result.unwrap(), Some(expected));
909            } else {
910                prop_assert!(matches!(result, Err(SyncError::Transport(_))));
911            }
912        }
913    }
914
915    proptest! {
916        #[test]
917        fn resolve_next_cursor_returns_none_when_transport_signals_no_more(
918            since in 0u64..20_000,
919            transport_cursor in prop::option::of(0u64..20_000),
920            sequences in prop::collection::vec(0u64..20_000, 0..64),
921        ) {
922            let events: Vec<SyncEvent> = sequences
923                .iter()
924                .enumerate()
925                .map(|(i, seq)| {
926                    SyncEvent::new(
927                        format!("evt-{i}"),
928                        "entity",
929                        format!("id-{i}"),
930                        json!({}),
931                    )
932                    .with_sequence(*seq)
933                })
934                .collect();
935
936            let result = SyncEngine::resolve_next_cursor(since, &events, false, transport_cursor)
937                .unwrap();
938            prop_assert_eq!(result, None);
939        }
940    }
941
942    #[test]
943    fn engine_debug() {
944        let engine = SyncEngine::new(make_config());
945        let debug = format!("{engine:?}");
946        assert!(debug.contains("SyncEngine"));
947    }
948}