taskchampion_sync_server_core/
server.rs

1use crate::error::ServerError;
2use crate::storage::{Snapshot, Storage, StorageTxn};
3use chrono::Utc;
4use uuid::Uuid;
5
6/// The distinguished value for "no version"
7pub const NIL_VERSION_ID: VersionId = Uuid::nil();
8
9/// Number of versions to search back from the latest to find the
10/// version for a newly-added snapshot.  Snapshots for versions older
11/// than this will be rejected.
12const SNAPSHOT_SEARCH_LEN: i32 = 5;
13
14pub type HistorySegment = Vec<u8>;
15pub type ClientId = Uuid;
16pub type VersionId = Uuid;
17
18/// ServerConfig contains configuration parameters for the server.
19pub struct ServerConfig {
20    /// Target number of days between snapshots.
21    pub snapshot_days: i64,
22
23    /// Target number of versions between snapshots.
24    pub snapshot_versions: u32,
25}
26
27impl Default for ServerConfig {
28    fn default() -> Self {
29        ServerConfig {
30            snapshot_days: 14,
31            snapshot_versions: 100,
32        }
33    }
34}
35
36/// Response to get_child_version.  See the protocol documentation.
37#[derive(Clone, PartialEq, Debug)]
38pub enum GetVersionResult {
39    NotFound,
40    Gone,
41    Success {
42        version_id: Uuid,
43        parent_version_id: Uuid,
44        history_segment: HistorySegment,
45    },
46}
47
48/// Response to add_version
49#[derive(Clone, PartialEq, Debug)]
50pub enum AddVersionResult {
51    /// OK, version added with the given ID
52    Ok(VersionId),
53    /// Rejected; expected a version with the given parent version
54    ExpectedParentVersion(VersionId),
55}
56
57/// Urgency of a snapshot for a client; used to create the `X-Snapshot-Request` header.
58#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
59pub enum SnapshotUrgency {
60    /// Don't need a snapshot right now.
61    None,
62
63    /// A snapshot would be good, but can wait for other replicas to provide it.
64    Low,
65
66    /// A snapshot is needed right now.
67    High,
68}
69
70impl SnapshotUrgency {
71    /// Calculate the urgency for a snapshot based on its age in days
72    fn for_days(config: &ServerConfig, days: i64) -> Self {
73        if days >= config.snapshot_days * 3 / 2 {
74            SnapshotUrgency::High
75        } else if days >= config.snapshot_days {
76            SnapshotUrgency::Low
77        } else {
78            SnapshotUrgency::None
79        }
80    }
81
82    /// Calculate the urgency for a snapshot based on its age in versions
83    fn for_versions_since(config: &ServerConfig, versions_since: u32) -> Self {
84        if versions_since >= config.snapshot_versions * 3 / 2 {
85            SnapshotUrgency::High
86        } else if versions_since >= config.snapshot_versions {
87            SnapshotUrgency::Low
88        } else {
89            SnapshotUrgency::None
90        }
91    }
92}
93
94/// A server implementing the TaskChampion sync protocol.
95pub struct Server {
96    config: ServerConfig,
97    storage: Box<dyn Storage>,
98}
99
100impl Server {
101    pub fn new<ST: Storage + 'static>(config: ServerConfig, storage: ST) -> Self {
102        Self {
103            config,
104            storage: Box::new(storage),
105        }
106    }
107
108    /// Implementation of the GetChildVersion protocol transaction.
109    pub async fn get_child_version(
110        &self,
111        client_id: ClientId,
112        parent_version_id: VersionId,
113    ) -> Result<GetVersionResult, ServerError> {
114        let mut txn = self.txn(client_id).await?;
115        let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
116
117        // If a version with parentVersionId equal to the requested parentVersionId exists, it is
118        // returned.
119        if let Some(version) = txn.get_version_by_parent(parent_version_id).await? {
120            return Ok(GetVersionResult::Success {
121                version_id: version.version_id,
122                parent_version_id: version.parent_version_id,
123                history_segment: version.history_segment,
124            });
125        }
126
127        // Return NotFound if an AddVersion with this parent_version_id would succeed, and
128        // otherwise return Gone.
129        //
130        // AddVersion will succeed if either
131        //  - the requested parent version is the latest version; or
132        //  - there is no latest version, meaning there are no versions stored for this client
133        Ok(
134            if client.latest_version_id == parent_version_id
135                || client.latest_version_id == NIL_VERSION_ID
136            {
137                GetVersionResult::NotFound
138            } else {
139                GetVersionResult::Gone
140            },
141        )
142    }
143
144    /// Implementation of the AddVersion protocol transaction
145    pub async fn add_version(
146        &self,
147        client_id: ClientId,
148        parent_version_id: VersionId,
149        history_segment: HistorySegment,
150    ) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> {
151        log::debug!("add_version(client_id: {client_id}, parent_version_id: {parent_version_id})");
152
153        let mut txn = self.txn(client_id).await?;
154        let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
155
156        // check if this version is acceptable, under the protection of the transaction
157        if client.latest_version_id != NIL_VERSION_ID
158            && parent_version_id != client.latest_version_id
159        {
160            log::debug!("add_version request rejected: mismatched latest_version_id");
161            return Ok((
162                AddVersionResult::ExpectedParentVersion(client.latest_version_id),
163                SnapshotUrgency::None,
164            ));
165        }
166
167        // invent a version ID
168        let version_id = Uuid::new_v4();
169        log::debug!("add_version request accepted: new version_id: {version_id}");
170
171        // update the DB
172        txn.add_version(version_id, parent_version_id, history_segment)
173            .await?;
174        txn.commit().await?;
175
176        // calculate the urgency
177        let time_urgency = match client.snapshot {
178            None => SnapshotUrgency::High,
179            Some(Snapshot { timestamp, .. }) => {
180                SnapshotUrgency::for_days(&self.config, (Utc::now() - timestamp).num_days())
181            }
182        };
183
184        let version_urgency = match client.snapshot {
185            None => SnapshotUrgency::High,
186            Some(Snapshot { versions_since, .. }) => {
187                SnapshotUrgency::for_versions_since(&self.config, versions_since)
188            }
189        };
190
191        Ok((
192            AddVersionResult::Ok(version_id),
193            std::cmp::max(time_urgency, version_urgency),
194        ))
195    }
196
197    /// Implementation of the AddSnapshot protocol transaction
198    pub async fn add_snapshot(
199        &self,
200        client_id: ClientId,
201        version_id: VersionId,
202        data: Vec<u8>,
203    ) -> Result<(), ServerError> {
204        log::debug!("add_snapshot(client_id: {client_id}, version_id: {version_id})");
205
206        let mut txn = self.txn(client_id).await?;
207        let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
208
209        // NOTE: if the snapshot is rejected, this function logs about it and returns
210        // Ok(()), as there's no reason to report an errot to the client / user.
211
212        let last_snapshot = client.snapshot.map(|snap| snap.version_id);
213        if Some(version_id) == last_snapshot {
214            log::debug!("rejecting snapshot for version {version_id}: already exists");
215            return Ok(());
216        }
217
218        // look for this version in the history of this client, starting at the latest version, and
219        // only iterating for a limited number of versions.
220        let mut search_len = SNAPSHOT_SEARCH_LEN;
221        let mut vid = client.latest_version_id;
222
223        loop {
224            if vid == version_id && version_id != NIL_VERSION_ID {
225                // the new snapshot is for a recent version, so proceed
226                break;
227            }
228
229            if Some(vid) == last_snapshot {
230                // the new snapshot is older than the last snapshot, so ignore it
231                log::debug!("rejecting snapshot for version {version_id}: newer snapshot already exists or no such version");
232                return Ok(());
233            }
234
235            search_len -= 1;
236            if search_len <= 0 || vid == NIL_VERSION_ID {
237                // this should not happen in normal operation, so warn about it
238                log::warn!("rejecting snapshot for version {version_id}: version is too old or no such version");
239                return Ok(());
240            }
241
242            // get the parent version ID
243            if let Some(parent) = txn.get_version(vid).await? {
244                vid = parent.parent_version_id;
245            } else {
246                // this version does not exist; "this should not happen" but if it does,
247                // we don't need a snapshot earlier than the missing version.
248                log::warn!("rejecting snapshot for version {version_id}: newer versions have already been deleted");
249                return Ok(());
250            }
251        }
252
253        log::debug!("accepting snapshot for version {version_id}");
254        txn.set_snapshot(
255            Snapshot {
256                version_id,
257                timestamp: Utc::now(),
258                versions_since: 0,
259            },
260            data,
261        )
262        .await?;
263        txn.commit().await?;
264        Ok(())
265    }
266
267    /// Implementation of the GetSnapshot protocol transaction
268    pub async fn get_snapshot(
269        &self,
270        client_id: ClientId,
271    ) -> Result<Option<(Uuid, Vec<u8>)>, ServerError> {
272        let mut txn = self.txn(client_id).await?;
273        let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
274
275        Ok(if let Some(snap) = client.snapshot {
276            txn.get_snapshot_data(snap.version_id)
277                .await?
278                .map(|data| (snap.version_id, data))
279        } else {
280            None
281        })
282    }
283
284    /// Convenience method to get a transaction for the embedded storage.
285    pub async fn txn(&self, client_id: Uuid) -> Result<Box<dyn StorageTxn + '_>, ServerError> {
286        Ok(self.storage.txn(client_id).await?)
287    }
288}
289
290#[cfg(test)]
291mod test {
292    use super::*;
293    use crate::inmemory::InMemoryStorage;
294    use crate::storage::{Snapshot, Storage};
295    use chrono::{Duration, TimeZone, Utc};
296    use pretty_assertions::assert_eq;
297
298    /// Set up for a test, returning storage and a client_id.
299    fn setup() -> (InMemoryStorage, Uuid) {
300        let _ = env_logger::builder().is_test(true).try_init();
301        let storage = InMemoryStorage::new();
302        let client_id = Uuid::new_v4();
303        (storage, client_id)
304    }
305
306    /// Convert storage into a Server.
307    fn into_server(storage: InMemoryStorage) -> Server {
308        Server::new(ServerConfig::default(), storage)
309    }
310
311    /// Add versions to the DB for the given client.
312    async fn add_versions(
313        storage: &InMemoryStorage,
314        client_id: Uuid,
315        num_versions: u32,
316        snapshot_version: Option<u32>,
317        snapshot_days_ago: Option<i64>,
318    ) -> anyhow::Result<Vec<Uuid>> {
319        let mut txn = storage.txn(client_id).await?;
320        let mut versions = vec![];
321
322        let mut version_id = Uuid::nil();
323        txn.new_client(Uuid::nil()).await?;
324        assert!(
325            num_versions < u8::MAX.into(),
326            "we cast the version number to u8"
327        );
328        for vnum in 0..num_versions {
329            let parent_version_id = version_id;
330            version_id = Uuid::new_v4();
331            versions.push(version_id);
332            txn.add_version(
333                version_id,
334                parent_version_id,
335                // Generate some unique data for this version.
336                vec![0, 0, vnum as u8],
337            )
338            .await?;
339            if Some(vnum) == snapshot_version {
340                txn.set_snapshot(
341                    Snapshot {
342                        version_id,
343                        versions_since: 0,
344                        timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
345                    },
346                    // Generate some unique data for this snapshot.
347                    vec![vnum as u8],
348                )
349                .await?;
350            }
351        }
352        txn.commit().await?;
353        Ok(versions)
354    }
355
356    /// Utility function to check the results of an add_version call
357    async fn av_success_check(
358        server: &Server,
359        client_id: Uuid,
360        existing_versions: &[Uuid],
361        (add_version_result, snapshot_urgency): (AddVersionResult, SnapshotUrgency),
362        expected_history: Vec<u8>,
363        expected_urgency: SnapshotUrgency,
364    ) -> anyhow::Result<()> {
365        if let AddVersionResult::Ok(new_version_id) = add_version_result {
366            // check that it invented a new version ID
367            for v in existing_versions {
368                assert_ne!(&new_version_id, v);
369            }
370
371            // verify that the storage was updated
372            let mut txn = server.txn(client_id).await?;
373            let client = txn.get_client().await?.unwrap();
374            assert_eq!(client.latest_version_id, new_version_id);
375
376            let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil);
377            let version = txn.get_version(new_version_id).await?.unwrap();
378            assert_eq!(version.version_id, new_version_id);
379            assert_eq!(version.parent_version_id, parent_version_id);
380            assert_eq!(version.history_segment, expected_history);
381        } else {
382            panic!("did not get Ok from add_version: {add_version_result:?}");
383        }
384
385        assert_eq!(snapshot_urgency, expected_urgency);
386
387        Ok(())
388    }
389
390    #[test]
391    fn snapshot_urgency_max() {
392        use SnapshotUrgency::*;
393        assert_eq!(std::cmp::max(None, None), None);
394        assert_eq!(std::cmp::max(None, Low), Low);
395        assert_eq!(std::cmp::max(None, High), High);
396        assert_eq!(std::cmp::max(Low, None), Low);
397        assert_eq!(std::cmp::max(Low, Low), Low);
398        assert_eq!(std::cmp::max(Low, High), High);
399        assert_eq!(std::cmp::max(High, None), High);
400        assert_eq!(std::cmp::max(High, Low), High);
401        assert_eq!(std::cmp::max(High, High), High);
402    }
403
404    #[test]
405    fn snapshot_urgency_for_days() {
406        use SnapshotUrgency::*;
407        let config = ServerConfig::default();
408        assert_eq!(SnapshotUrgency::for_days(&config, 0), None);
409        assert_eq!(
410            SnapshotUrgency::for_days(&config, config.snapshot_days),
411            Low
412        );
413        assert_eq!(
414            SnapshotUrgency::for_days(&config, config.snapshot_days * 2),
415            High
416        );
417    }
418
419    #[test]
420    fn snapshot_urgency_for_versions_since() {
421        use SnapshotUrgency::*;
422        let config = ServerConfig::default();
423        assert_eq!(SnapshotUrgency::for_versions_since(&config, 0), None);
424        assert_eq!(
425            SnapshotUrgency::for_versions_since(&config, config.snapshot_versions),
426            Low
427        );
428        assert_eq!(
429            SnapshotUrgency::for_versions_since(&config, config.snapshot_versions * 2),
430            High
431        );
432    }
433
434    #[tokio::test]
435    async fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> {
436        let (storage, client_id) = setup();
437        {
438            let mut txn = storage.txn(client_id).await?;
439            txn.new_client(NIL_VERSION_ID).await?;
440            txn.commit().await?;
441        }
442
443        let server = into_server(storage);
444
445        // when no latest version exists, the first version is NotFound
446        assert_eq!(
447            server.get_child_version(client_id, NIL_VERSION_ID).await?,
448            GetVersionResult::NotFound
449        );
450        Ok(())
451    }
452
453    #[tokio::test]
454    async fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> {
455        let (storage, client_id) = setup();
456        {
457            let mut txn = storage.txn(client_id).await?;
458            txn.new_client(NIL_VERSION_ID).await?;
459            txn.commit().await?;
460        }
461
462        let server = into_server(storage);
463
464        // when no latest version exists, _any_ child version is NOT_FOUND. This allows syncs to
465        // start to a new server even if the client already has been uploading to another service.
466        assert_eq!(
467            server.get_child_version(client_id, Uuid::new_v4(),).await?,
468            GetVersionResult::NotFound
469        );
470        Ok(())
471    }
472
473    #[tokio::test]
474    async fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> {
475        let (storage, client_id) = setup();
476        let parent_version_id = Uuid::new_v4();
477        {
478            let mut txn = storage.txn(client_id).await?;
479            // add a parent version, but not the requested child version
480            txn.new_client(parent_version_id).await?;
481            txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])
482                .await?;
483            txn.commit().await?;
484        }
485
486        let server = into_server(storage);
487        assert_eq!(
488            server
489                .get_child_version(client_id, parent_version_id)
490                .await?,
491            GetVersionResult::NotFound
492        );
493        Ok(())
494    }
495
496    #[tokio::test]
497    async fn get_child_version_gone_not_latest() -> anyhow::Result<()> {
498        let (storage, client_id) = setup();
499        let parent_version_id = Uuid::new_v4();
500        {
501            let mut txn = storage.txn(client_id).await?;
502            // Add a parent version, but not the requested parent version
503            txn.new_client(parent_version_id).await?;
504            txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])
505                .await?;
506            txn.commit().await?;
507        }
508
509        let server = into_server(storage);
510        assert_eq!(
511            server.get_child_version(client_id, Uuid::new_v4(),).await?,
512            GetVersionResult::Gone
513        );
514        Ok(())
515    }
516
517    #[tokio::test]
518    async fn get_child_version_found() -> anyhow::Result<()> {
519        let (storage, client_id) = setup();
520        let version_id = Uuid::new_v4();
521        let parent_version_id = Uuid::new_v4();
522        let history_segment = b"abcd".to_vec();
523        {
524            let mut txn = storage.txn(client_id).await?;
525            txn.new_client(version_id).await?;
526            txn.add_version(version_id, parent_version_id, history_segment.clone())
527                .await?;
528            txn.commit().await?;
529        }
530
531        let server = into_server(storage);
532        assert_eq!(
533            server
534                .get_child_version(client_id, parent_version_id)
535                .await?,
536            GetVersionResult::Success {
537                version_id,
538                parent_version_id,
539                history_segment,
540            }
541        );
542        Ok(())
543    }
544
545    #[tokio::test]
546    async fn add_version_conflict() -> anyhow::Result<()> {
547        let (storage, client_id) = setup();
548        let versions = add_versions(&storage, client_id, 3, None, None).await?;
549
550        // try to add a child of a version other than the latest
551        let server = into_server(storage);
552        assert_eq!(
553            server
554                .add_version(client_id, versions[1], vec![3, 6, 9])
555                .await?
556                .0,
557            AddVersionResult::ExpectedParentVersion(versions[2])
558        );
559
560        // verify that the storage wasn't updated
561        let mut txn = server.txn(client_id).await?;
562        assert_eq!(
563            txn.get_client().await?.unwrap().latest_version_id,
564            versions[2]
565        );
566        assert_eq!(txn.get_version_by_parent(versions[2]).await?, None);
567
568        Ok(())
569    }
570
571    #[tokio::test]
572    async fn add_version_with_existing_history() -> anyhow::Result<()> {
573        let (storage, client_id) = setup();
574        let versions = add_versions(&storage, client_id, 1, None, None).await?;
575
576        let server = into_server(storage);
577        let result = server
578            .add_version(client_id, versions[0], vec![3, 6, 9])
579            .await?;
580
581        av_success_check(
582            &server,
583            client_id,
584            &versions,
585            result,
586            vec![3, 6, 9],
587            // urgency=high because there are no snapshots yet
588            SnapshotUrgency::High,
589        )
590        .await?;
591
592        Ok(())
593    }
594
595    #[tokio::test]
596    async fn add_version_with_no_history() -> anyhow::Result<()> {
597        let (storage, client_id) = setup();
598        let versions = add_versions(&storage, client_id, 0, None, None).await?;
599
600        let server = into_server(storage);
601        let parent_version_id = Uuid::nil();
602        let result = server
603            .add_version(client_id, parent_version_id, vec![3, 6, 9])
604            .await?;
605
606        av_success_check(
607            &server,
608            client_id,
609            &versions,
610            result,
611            vec![3, 6, 9],
612            // urgency=high because there are no snapshots yet
613            SnapshotUrgency::High,
614        )
615        .await?;
616
617        Ok(())
618    }
619
620    #[tokio::test]
621    async fn add_version_success_recent_snapshot() -> anyhow::Result<()> {
622        let (storage, client_id) = setup();
623        let versions = add_versions(&storage, client_id, 1, Some(0), None).await?;
624
625        let server = into_server(storage);
626        let result = server
627            .add_version(client_id, versions[0], vec![1, 2, 3])
628            .await?;
629
630        av_success_check(
631            &server,
632            client_id,
633            &versions,
634            result,
635            vec![1, 2, 3],
636            // no snapshot request since the previous version has a snapshot
637            SnapshotUrgency::None,
638        )
639        .await?;
640
641        Ok(())
642    }
643
644    #[tokio::test]
645    async fn add_version_success_aged_snapshot() -> anyhow::Result<()> {
646        // one snapshot, but it was 50 days ago
647        let (storage, client_id) = setup();
648        let versions = add_versions(&storage, client_id, 1, Some(0), Some(50)).await?;
649
650        let server = into_server(storage);
651        let result = server
652            .add_version(client_id, versions[0], vec![1, 2, 3])
653            .await?;
654
655        av_success_check(
656            &server,
657            client_id,
658            &versions,
659            result,
660            vec![1, 2, 3],
661            // urgency=high due to days since the snapshot
662            SnapshotUrgency::High,
663        )
664        .await?;
665
666        Ok(())
667    }
668
669    #[tokio::test]
670    async fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> {
671        // one snapshot, but it was 50 versions ago
672        let (storage, client_id) = setup();
673        let versions = add_versions(&storage, client_id, 50, Some(0), None).await?;
674
675        let mut server = into_server(storage);
676        server.config.snapshot_versions = 30;
677
678        let result = server
679            .add_version(client_id, versions[49], vec![1, 2, 3])
680            .await?;
681
682        av_success_check(
683            &server,
684            client_id,
685            &versions,
686            result,
687            vec![1, 2, 3],
688            // urgency=high due to number of versions since the snapshot
689            SnapshotUrgency::High,
690        )
691        .await?;
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn add_snapshot_success_latest() -> anyhow::Result<()> {
698        let (storage, client_id) = setup();
699        let version_id = Uuid::new_v4();
700
701        {
702            let mut txn = storage.txn(client_id).await?;
703            // set up a task DB with one version in it
704            txn.new_client(version_id).await?;
705            txn.add_version(version_id, NIL_VERSION_ID, vec![]).await?;
706
707            txn.commit().await?;
708        }
709
710        let server = into_server(storage);
711        server
712            .add_snapshot(client_id, version_id, vec![1, 2, 3])
713            .await?;
714
715        // verify the snapshot
716        let mut txn = server.txn(client_id).await?;
717        let client = txn.get_client().await?.unwrap();
718        let snapshot = client.snapshot.unwrap();
719        assert_eq!(snapshot.version_id, version_id);
720        assert_eq!(snapshot.versions_since, 0);
721        assert_eq!(
722            txn.get_snapshot_data(version_id).await.unwrap(),
723            Some(vec![1, 2, 3])
724        );
725
726        Ok(())
727    }
728
729    #[tokio::test]
730    async fn add_snapshot_success_older() -> anyhow::Result<()> {
731        let (storage, client_id) = setup();
732        let version_id_1 = Uuid::new_v4();
733        let version_id_2 = Uuid::new_v4();
734
735        {
736            let mut txn = storage.txn(client_id).await?;
737            // set up a task DB with two versions in it
738            txn.new_client(version_id_2).await?;
739            txn.add_version(version_id_1, NIL_VERSION_ID, vec![])
740                .await?;
741            txn.add_version(version_id_2, version_id_1, vec![]).await?;
742
743            txn.commit().await?;
744        }
745
746        // add a snapshot for version 1
747        let server = into_server(storage);
748        server
749            .add_snapshot(client_id, version_id_1, vec![1, 2, 3])
750            .await?;
751
752        // verify the snapshot
753        let mut txn = server.txn(client_id).await?;
754        let client = txn.get_client().await?.unwrap();
755        let snapshot = client.snapshot.unwrap();
756        assert_eq!(snapshot.version_id, version_id_1);
757        assert_eq!(snapshot.versions_since, 0);
758        assert_eq!(
759            txn.get_snapshot_data(version_id_1).await.unwrap(),
760            Some(vec![1, 2, 3])
761        );
762
763        Ok(())
764    }
765
766    #[tokio::test]
767    async fn add_snapshot_fails_no_such() -> anyhow::Result<()> {
768        let (storage, client_id) = setup();
769        let version_id_1 = Uuid::new_v4();
770        let version_id_2 = Uuid::new_v4();
771
772        {
773            let mut txn = storage.txn(client_id).await?;
774            // set up a task DB with two versions in it
775            txn.new_client(version_id_2).await?;
776            txn.add_version(version_id_1, NIL_VERSION_ID, vec![])
777                .await?;
778            txn.add_version(version_id_2, version_id_1, vec![]).await?;
779
780            txn.commit().await?;
781        }
782
783        // add a snapshot for unknown version
784        let server = into_server(storage);
785        let version_id_unk = Uuid::new_v4();
786        server
787            .add_snapshot(client_id, version_id_unk, vec![1, 2, 3])
788            .await?;
789
790        // verify the snapshot does not exist
791        let mut txn = server.txn(client_id).await?;
792        let client = txn.get_client().await?.unwrap();
793        assert!(client.snapshot.is_none());
794
795        Ok(())
796    }
797
798    #[tokio::test]
799    async fn add_snapshot_fails_too_old() -> anyhow::Result<()> {
800        let (storage, client_id) = setup();
801        let mut version_id = Uuid::new_v4();
802        let mut parent_version_id = Uuid::nil();
803        let mut version_ids = vec![];
804
805        {
806            let mut txn = storage.txn(client_id).await?;
807            // set up a task DB with 10 versions in it (oldest to newest)
808            txn.new_client(Uuid::nil()).await?;
809            for _ in 0..10 {
810                txn.add_version(version_id, parent_version_id, vec![])
811                    .await?;
812                version_ids.push(version_id);
813                parent_version_id = version_id;
814                version_id = Uuid::new_v4();
815            }
816
817            txn.commit().await?;
818        }
819
820        // add a snapshot for the earliest of those
821        let server = into_server(storage);
822        server
823            .add_snapshot(client_id, version_ids[0], vec![1, 2, 3])
824            .await?;
825
826        // verify the snapshot does not exist
827        let mut txn = server.txn(client_id).await?;
828        let client = txn.get_client().await?.unwrap();
829        assert!(client.snapshot.is_none());
830
831        Ok(())
832    }
833
834    #[tokio::test]
835    async fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> {
836        let (storage, client_id) = setup();
837        let mut version_id = Uuid::new_v4();
838        let mut parent_version_id = Uuid::nil();
839        let mut version_ids = vec![];
840
841        {
842            let mut txn = storage.txn(client_id).await?;
843            // set up a task DB with 5 versions in it (oldest to newest) and a snapshot of the
844            // middle one
845            txn.new_client(Uuid::nil()).await?;
846            for _ in 0..5 {
847                txn.add_version(version_id, parent_version_id, vec![])
848                    .await?;
849                version_ids.push(version_id);
850                parent_version_id = version_id;
851                version_id = Uuid::new_v4();
852            }
853            txn.set_snapshot(
854                Snapshot {
855                    version_id: version_ids[2],
856                    versions_since: 2,
857                    timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
858                },
859                vec![1, 2, 3],
860            )
861            .await?;
862
863            txn.commit().await?;
864        }
865
866        // add a snapshot for the earliest of those
867        let server = into_server(storage);
868        server
869            .add_snapshot(client_id, version_ids[0], vec![9, 9, 9])
870            .await?;
871
872        // verify the snapshot was not replaced
873        let mut txn = server.txn(client_id).await?;
874        let client = txn.get_client().await?.unwrap();
875        let snapshot = client.snapshot.unwrap();
876        assert_eq!(snapshot.version_id, version_ids[2]);
877        assert_eq!(snapshot.versions_since, 2);
878        assert_eq!(
879            txn.get_snapshot_data(version_ids[2]).await.unwrap(),
880            Some(vec![1, 2, 3])
881        );
882
883        Ok(())
884    }
885
886    #[tokio::test]
887    async fn add_snapshot_fails_nil_version() -> anyhow::Result<()> {
888        let (storage, client_id) = setup();
889        {
890            let mut txn = storage.txn(client_id).await?;
891            // just set up the client
892            txn.new_client(NIL_VERSION_ID).await?;
893            txn.commit().await?;
894        }
895
896        let server = into_server(storage);
897        server
898            .add_snapshot(client_id, NIL_VERSION_ID, vec![9, 9, 9])
899            .await?;
900
901        // verify the snapshot does not exist
902        let mut txn = server.txn(client_id).await?;
903        let client = txn.get_client().await?.unwrap();
904        assert!(client.snapshot.is_none());
905
906        Ok(())
907    }
908
909    #[tokio::test]
910    async fn get_snapshot_found() -> anyhow::Result<()> {
911        let (storage, client_id) = setup();
912        let data = vec![1, 2, 3];
913        let snapshot_version_id = Uuid::new_v4();
914
915        {
916            let mut txn = storage.txn(client_id).await?;
917            txn.new_client(snapshot_version_id).await?;
918            txn.set_snapshot(
919                Snapshot {
920                    version_id: snapshot_version_id,
921                    versions_since: 3,
922                    timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
923                },
924                data.clone(),
925            )
926            .await?;
927            txn.commit().await?;
928        }
929
930        let server = into_server(storage);
931        assert_eq!(
932            server.get_snapshot(client_id).await?,
933            Some((snapshot_version_id, data))
934        );
935
936        Ok(())
937    }
938
939    #[tokio::test]
940    async fn get_snapshot_not_found() -> anyhow::Result<()> {
941        let (storage, client_id) = setup();
942        {
943            let mut txn = storage.txn(client_id).await?;
944            txn.new_client(NIL_VERSION_ID).await?;
945            txn.commit().await?;
946        }
947
948        let server = into_server(storage);
949        assert_eq!(server.get_snapshot(client_id).await?, None);
950
951        Ok(())
952    }
953}