1use crate::error::ServerError;
2use crate::storage::{Snapshot, Storage, StorageTxn};
3use chrono::Utc;
4use uuid::Uuid;
5
6pub const NIL_VERSION_ID: VersionId = Uuid::nil();
8
9const SNAPSHOT_SEARCH_LEN: i32 = 5;
13
14pub type HistorySegment = Vec<u8>;
15pub type ClientId = Uuid;
16pub type VersionId = Uuid;
17
18pub struct ServerConfig {
20 pub snapshot_days: i64,
22
23 pub snapshot_versions: u32,
25}
26
27impl Default for ServerConfig {
28 fn default() -> Self {
29 ServerConfig {
30 snapshot_days: 14,
31 snapshot_versions: 100,
32 }
33 }
34}
35
36#[derive(Clone, PartialEq, Debug)]
38pub enum GetVersionResult {
39 NotFound,
40 Gone,
41 Success {
42 version_id: Uuid,
43 parent_version_id: Uuid,
44 history_segment: HistorySegment,
45 },
46}
47
48#[derive(Clone, PartialEq, Debug)]
50pub enum AddVersionResult {
51 Ok(VersionId),
53 ExpectedParentVersion(VersionId),
55}
56
57#[derive(PartialEq, Debug, Clone, Copy, Eq, PartialOrd, Ord)]
59pub enum SnapshotUrgency {
60 None,
62
63 Low,
65
66 High,
68}
69
70impl SnapshotUrgency {
71 fn for_days(config: &ServerConfig, days: i64) -> Self {
73 if days >= config.snapshot_days * 3 / 2 {
74 SnapshotUrgency::High
75 } else if days >= config.snapshot_days {
76 SnapshotUrgency::Low
77 } else {
78 SnapshotUrgency::None
79 }
80 }
81
82 fn for_versions_since(config: &ServerConfig, versions_since: u32) -> Self {
84 if versions_since >= config.snapshot_versions * 3 / 2 {
85 SnapshotUrgency::High
86 } else if versions_since >= config.snapshot_versions {
87 SnapshotUrgency::Low
88 } else {
89 SnapshotUrgency::None
90 }
91 }
92}
93
94pub struct Server {
96 config: ServerConfig,
97 storage: Box<dyn Storage>,
98}
99
100impl Server {
101 pub fn new<ST: Storage + 'static>(config: ServerConfig, storage: ST) -> Self {
102 Self {
103 config,
104 storage: Box::new(storage),
105 }
106 }
107
108 pub async fn get_child_version(
110 &self,
111 client_id: ClientId,
112 parent_version_id: VersionId,
113 ) -> Result<GetVersionResult, ServerError> {
114 let mut txn = self.txn(client_id).await?;
115 let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
116
117 if let Some(version) = txn.get_version_by_parent(parent_version_id).await? {
120 return Ok(GetVersionResult::Success {
121 version_id: version.version_id,
122 parent_version_id: version.parent_version_id,
123 history_segment: version.history_segment,
124 });
125 }
126
127 Ok(
134 if client.latest_version_id == parent_version_id
135 || client.latest_version_id == NIL_VERSION_ID
136 {
137 GetVersionResult::NotFound
138 } else {
139 GetVersionResult::Gone
140 },
141 )
142 }
143
144 pub async fn add_version(
146 &self,
147 client_id: ClientId,
148 parent_version_id: VersionId,
149 history_segment: HistorySegment,
150 ) -> Result<(AddVersionResult, SnapshotUrgency), ServerError> {
151 log::debug!("add_version(client_id: {client_id}, parent_version_id: {parent_version_id})");
152
153 let mut txn = self.txn(client_id).await?;
154 let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
155
156 if client.latest_version_id != NIL_VERSION_ID
158 && parent_version_id != client.latest_version_id
159 {
160 log::debug!("add_version request rejected: mismatched latest_version_id");
161 return Ok((
162 AddVersionResult::ExpectedParentVersion(client.latest_version_id),
163 SnapshotUrgency::None,
164 ));
165 }
166
167 let version_id = Uuid::new_v4();
169 log::debug!("add_version request accepted: new version_id: {version_id}");
170
171 txn.add_version(version_id, parent_version_id, history_segment)
173 .await?;
174 txn.commit().await?;
175
176 let time_urgency = match client.snapshot {
178 None => SnapshotUrgency::High,
179 Some(Snapshot { timestamp, .. }) => {
180 SnapshotUrgency::for_days(&self.config, (Utc::now() - timestamp).num_days())
181 }
182 };
183
184 let version_urgency = match client.snapshot {
185 None => SnapshotUrgency::High,
186 Some(Snapshot { versions_since, .. }) => {
187 SnapshotUrgency::for_versions_since(&self.config, versions_since)
188 }
189 };
190
191 Ok((
192 AddVersionResult::Ok(version_id),
193 std::cmp::max(time_urgency, version_urgency),
194 ))
195 }
196
197 pub async fn add_snapshot(
199 &self,
200 client_id: ClientId,
201 version_id: VersionId,
202 data: Vec<u8>,
203 ) -> Result<(), ServerError> {
204 log::debug!("add_snapshot(client_id: {client_id}, version_id: {version_id})");
205
206 let mut txn = self.txn(client_id).await?;
207 let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
208
209 let last_snapshot = client.snapshot.map(|snap| snap.version_id);
213 if Some(version_id) == last_snapshot {
214 log::debug!("rejecting snapshot for version {version_id}: already exists");
215 return Ok(());
216 }
217
218 let mut search_len = SNAPSHOT_SEARCH_LEN;
221 let mut vid = client.latest_version_id;
222
223 loop {
224 if vid == version_id && version_id != NIL_VERSION_ID {
225 break;
227 }
228
229 if Some(vid) == last_snapshot {
230 log::debug!("rejecting snapshot for version {version_id}: newer snapshot already exists or no such version");
232 return Ok(());
233 }
234
235 search_len -= 1;
236 if search_len <= 0 || vid == NIL_VERSION_ID {
237 log::warn!("rejecting snapshot for version {version_id}: version is too old or no such version");
239 return Ok(());
240 }
241
242 if let Some(parent) = txn.get_version(vid).await? {
244 vid = parent.parent_version_id;
245 } else {
246 log::warn!("rejecting snapshot for version {version_id}: newer versions have already been deleted");
249 return Ok(());
250 }
251 }
252
253 log::debug!("accepting snapshot for version {version_id}");
254 txn.set_snapshot(
255 Snapshot {
256 version_id,
257 timestamp: Utc::now(),
258 versions_since: 0,
259 },
260 data,
261 )
262 .await?;
263 txn.commit().await?;
264 Ok(())
265 }
266
267 pub async fn get_snapshot(
269 &self,
270 client_id: ClientId,
271 ) -> Result<Option<(Uuid, Vec<u8>)>, ServerError> {
272 let mut txn = self.txn(client_id).await?;
273 let client = txn.get_client().await?.ok_or(ServerError::NoSuchClient)?;
274
275 Ok(if let Some(snap) = client.snapshot {
276 txn.get_snapshot_data(snap.version_id)
277 .await?
278 .map(|data| (snap.version_id, data))
279 } else {
280 None
281 })
282 }
283
284 pub async fn txn(&self, client_id: Uuid) -> Result<Box<dyn StorageTxn + '_>, ServerError> {
286 Ok(self.storage.txn(client_id).await?)
287 }
288}
289
290#[cfg(test)]
291mod test {
292 use super::*;
293 use crate::inmemory::InMemoryStorage;
294 use crate::storage::{Snapshot, Storage};
295 use chrono::{Duration, TimeZone, Utc};
296 use pretty_assertions::assert_eq;
297
298 fn setup() -> (InMemoryStorage, Uuid) {
300 let _ = env_logger::builder().is_test(true).try_init();
301 let storage = InMemoryStorage::new();
302 let client_id = Uuid::new_v4();
303 (storage, client_id)
304 }
305
306 fn into_server(storage: InMemoryStorage) -> Server {
308 Server::new(ServerConfig::default(), storage)
309 }
310
311 async fn add_versions(
313 storage: &InMemoryStorage,
314 client_id: Uuid,
315 num_versions: u32,
316 snapshot_version: Option<u32>,
317 snapshot_days_ago: Option<i64>,
318 ) -> anyhow::Result<Vec<Uuid>> {
319 let mut txn = storage.txn(client_id).await?;
320 let mut versions = vec![];
321
322 let mut version_id = Uuid::nil();
323 txn.new_client(Uuid::nil()).await?;
324 assert!(
325 num_versions < u8::MAX.into(),
326 "we cast the version number to u8"
327 );
328 for vnum in 0..num_versions {
329 let parent_version_id = version_id;
330 version_id = Uuid::new_v4();
331 versions.push(version_id);
332 txn.add_version(
333 version_id,
334 parent_version_id,
335 vec![0, 0, vnum as u8],
337 )
338 .await?;
339 if Some(vnum) == snapshot_version {
340 txn.set_snapshot(
341 Snapshot {
342 version_id,
343 versions_since: 0,
344 timestamp: Utc::now() - Duration::days(snapshot_days_ago.unwrap_or(0)),
345 },
346 vec![vnum as u8],
348 )
349 .await?;
350 }
351 }
352 txn.commit().await?;
353 Ok(versions)
354 }
355
356 async fn av_success_check(
358 server: &Server,
359 client_id: Uuid,
360 existing_versions: &[Uuid],
361 (add_version_result, snapshot_urgency): (AddVersionResult, SnapshotUrgency),
362 expected_history: Vec<u8>,
363 expected_urgency: SnapshotUrgency,
364 ) -> anyhow::Result<()> {
365 if let AddVersionResult::Ok(new_version_id) = add_version_result {
366 for v in existing_versions {
368 assert_ne!(&new_version_id, v);
369 }
370
371 let mut txn = server.txn(client_id).await?;
373 let client = txn.get_client().await?.unwrap();
374 assert_eq!(client.latest_version_id, new_version_id);
375
376 let parent_version_id = existing_versions.last().cloned().unwrap_or_else(Uuid::nil);
377 let version = txn.get_version(new_version_id).await?.unwrap();
378 assert_eq!(version.version_id, new_version_id);
379 assert_eq!(version.parent_version_id, parent_version_id);
380 assert_eq!(version.history_segment, expected_history);
381 } else {
382 panic!("did not get Ok from add_version: {add_version_result:?}");
383 }
384
385 assert_eq!(snapshot_urgency, expected_urgency);
386
387 Ok(())
388 }
389
390 #[test]
391 fn snapshot_urgency_max() {
392 use SnapshotUrgency::*;
393 assert_eq!(std::cmp::max(None, None), None);
394 assert_eq!(std::cmp::max(None, Low), Low);
395 assert_eq!(std::cmp::max(None, High), High);
396 assert_eq!(std::cmp::max(Low, None), Low);
397 assert_eq!(std::cmp::max(Low, Low), Low);
398 assert_eq!(std::cmp::max(Low, High), High);
399 assert_eq!(std::cmp::max(High, None), High);
400 assert_eq!(std::cmp::max(High, Low), High);
401 assert_eq!(std::cmp::max(High, High), High);
402 }
403
404 #[test]
405 fn snapshot_urgency_for_days() {
406 use SnapshotUrgency::*;
407 let config = ServerConfig::default();
408 assert_eq!(SnapshotUrgency::for_days(&config, 0), None);
409 assert_eq!(
410 SnapshotUrgency::for_days(&config, config.snapshot_days),
411 Low
412 );
413 assert_eq!(
414 SnapshotUrgency::for_days(&config, config.snapshot_days * 2),
415 High
416 );
417 }
418
419 #[test]
420 fn snapshot_urgency_for_versions_since() {
421 use SnapshotUrgency::*;
422 let config = ServerConfig::default();
423 assert_eq!(SnapshotUrgency::for_versions_since(&config, 0), None);
424 assert_eq!(
425 SnapshotUrgency::for_versions_since(&config, config.snapshot_versions),
426 Low
427 );
428 assert_eq!(
429 SnapshotUrgency::for_versions_since(&config, config.snapshot_versions * 2),
430 High
431 );
432 }
433
434 #[tokio::test]
435 async fn get_child_version_not_found_initial_nil() -> anyhow::Result<()> {
436 let (storage, client_id) = setup();
437 {
438 let mut txn = storage.txn(client_id).await?;
439 txn.new_client(NIL_VERSION_ID).await?;
440 txn.commit().await?;
441 }
442
443 let server = into_server(storage);
444
445 assert_eq!(
447 server.get_child_version(client_id, NIL_VERSION_ID).await?,
448 GetVersionResult::NotFound
449 );
450 Ok(())
451 }
452
453 #[tokio::test]
454 async fn get_child_version_not_found_initial_continuing() -> anyhow::Result<()> {
455 let (storage, client_id) = setup();
456 {
457 let mut txn = storage.txn(client_id).await?;
458 txn.new_client(NIL_VERSION_ID).await?;
459 txn.commit().await?;
460 }
461
462 let server = into_server(storage);
463
464 assert_eq!(
467 server.get_child_version(client_id, Uuid::new_v4(),).await?,
468 GetVersionResult::NotFound
469 );
470 Ok(())
471 }
472
473 #[tokio::test]
474 async fn get_child_version_not_found_up_to_date() -> anyhow::Result<()> {
475 let (storage, client_id) = setup();
476 let parent_version_id = Uuid::new_v4();
477 {
478 let mut txn = storage.txn(client_id).await?;
479 txn.new_client(parent_version_id).await?;
481 txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])
482 .await?;
483 txn.commit().await?;
484 }
485
486 let server = into_server(storage);
487 assert_eq!(
488 server
489 .get_child_version(client_id, parent_version_id)
490 .await?,
491 GetVersionResult::NotFound
492 );
493 Ok(())
494 }
495
496 #[tokio::test]
497 async fn get_child_version_gone_not_latest() -> anyhow::Result<()> {
498 let (storage, client_id) = setup();
499 let parent_version_id = Uuid::new_v4();
500 {
501 let mut txn = storage.txn(client_id).await?;
502 txn.new_client(parent_version_id).await?;
504 txn.add_version(parent_version_id, NIL_VERSION_ID, vec![])
505 .await?;
506 txn.commit().await?;
507 }
508
509 let server = into_server(storage);
510 assert_eq!(
511 server.get_child_version(client_id, Uuid::new_v4(),).await?,
512 GetVersionResult::Gone
513 );
514 Ok(())
515 }
516
517 #[tokio::test]
518 async fn get_child_version_found() -> anyhow::Result<()> {
519 let (storage, client_id) = setup();
520 let version_id = Uuid::new_v4();
521 let parent_version_id = Uuid::new_v4();
522 let history_segment = b"abcd".to_vec();
523 {
524 let mut txn = storage.txn(client_id).await?;
525 txn.new_client(version_id).await?;
526 txn.add_version(version_id, parent_version_id, history_segment.clone())
527 .await?;
528 txn.commit().await?;
529 }
530
531 let server = into_server(storage);
532 assert_eq!(
533 server
534 .get_child_version(client_id, parent_version_id)
535 .await?,
536 GetVersionResult::Success {
537 version_id,
538 parent_version_id,
539 history_segment,
540 }
541 );
542 Ok(())
543 }
544
545 #[tokio::test]
546 async fn add_version_conflict() -> anyhow::Result<()> {
547 let (storage, client_id) = setup();
548 let versions = add_versions(&storage, client_id, 3, None, None).await?;
549
550 let server = into_server(storage);
552 assert_eq!(
553 server
554 .add_version(client_id, versions[1], vec![3, 6, 9])
555 .await?
556 .0,
557 AddVersionResult::ExpectedParentVersion(versions[2])
558 );
559
560 let mut txn = server.txn(client_id).await?;
562 assert_eq!(
563 txn.get_client().await?.unwrap().latest_version_id,
564 versions[2]
565 );
566 assert_eq!(txn.get_version_by_parent(versions[2]).await?, None);
567
568 Ok(())
569 }
570
571 #[tokio::test]
572 async fn add_version_with_existing_history() -> anyhow::Result<()> {
573 let (storage, client_id) = setup();
574 let versions = add_versions(&storage, client_id, 1, None, None).await?;
575
576 let server = into_server(storage);
577 let result = server
578 .add_version(client_id, versions[0], vec![3, 6, 9])
579 .await?;
580
581 av_success_check(
582 &server,
583 client_id,
584 &versions,
585 result,
586 vec![3, 6, 9],
587 SnapshotUrgency::High,
589 )
590 .await?;
591
592 Ok(())
593 }
594
595 #[tokio::test]
596 async fn add_version_with_no_history() -> anyhow::Result<()> {
597 let (storage, client_id) = setup();
598 let versions = add_versions(&storage, client_id, 0, None, None).await?;
599
600 let server = into_server(storage);
601 let parent_version_id = Uuid::nil();
602 let result = server
603 .add_version(client_id, parent_version_id, vec![3, 6, 9])
604 .await?;
605
606 av_success_check(
607 &server,
608 client_id,
609 &versions,
610 result,
611 vec![3, 6, 9],
612 SnapshotUrgency::High,
614 )
615 .await?;
616
617 Ok(())
618 }
619
620 #[tokio::test]
621 async fn add_version_success_recent_snapshot() -> anyhow::Result<()> {
622 let (storage, client_id) = setup();
623 let versions = add_versions(&storage, client_id, 1, Some(0), None).await?;
624
625 let server = into_server(storage);
626 let result = server
627 .add_version(client_id, versions[0], vec![1, 2, 3])
628 .await?;
629
630 av_success_check(
631 &server,
632 client_id,
633 &versions,
634 result,
635 vec![1, 2, 3],
636 SnapshotUrgency::None,
638 )
639 .await?;
640
641 Ok(())
642 }
643
644 #[tokio::test]
645 async fn add_version_success_aged_snapshot() -> anyhow::Result<()> {
646 let (storage, client_id) = setup();
648 let versions = add_versions(&storage, client_id, 1, Some(0), Some(50)).await?;
649
650 let server = into_server(storage);
651 let result = server
652 .add_version(client_id, versions[0], vec![1, 2, 3])
653 .await?;
654
655 av_success_check(
656 &server,
657 client_id,
658 &versions,
659 result,
660 vec![1, 2, 3],
661 SnapshotUrgency::High,
663 )
664 .await?;
665
666 Ok(())
667 }
668
669 #[tokio::test]
670 async fn add_version_success_snapshot_many_versions_ago() -> anyhow::Result<()> {
671 let (storage, client_id) = setup();
673 let versions = add_versions(&storage, client_id, 50, Some(0), None).await?;
674
675 let mut server = into_server(storage);
676 server.config.snapshot_versions = 30;
677
678 let result = server
679 .add_version(client_id, versions[49], vec![1, 2, 3])
680 .await?;
681
682 av_success_check(
683 &server,
684 client_id,
685 &versions,
686 result,
687 vec![1, 2, 3],
688 SnapshotUrgency::High,
690 )
691 .await?;
692
693 Ok(())
694 }
695
696 #[tokio::test]
697 async fn add_snapshot_success_latest() -> anyhow::Result<()> {
698 let (storage, client_id) = setup();
699 let version_id = Uuid::new_v4();
700
701 {
702 let mut txn = storage.txn(client_id).await?;
703 txn.new_client(version_id).await?;
705 txn.add_version(version_id, NIL_VERSION_ID, vec![]).await?;
706
707 txn.commit().await?;
708 }
709
710 let server = into_server(storage);
711 server
712 .add_snapshot(client_id, version_id, vec![1, 2, 3])
713 .await?;
714
715 let mut txn = server.txn(client_id).await?;
717 let client = txn.get_client().await?.unwrap();
718 let snapshot = client.snapshot.unwrap();
719 assert_eq!(snapshot.version_id, version_id);
720 assert_eq!(snapshot.versions_since, 0);
721 assert_eq!(
722 txn.get_snapshot_data(version_id).await.unwrap(),
723 Some(vec![1, 2, 3])
724 );
725
726 Ok(())
727 }
728
729 #[tokio::test]
730 async fn add_snapshot_success_older() -> anyhow::Result<()> {
731 let (storage, client_id) = setup();
732 let version_id_1 = Uuid::new_v4();
733 let version_id_2 = Uuid::new_v4();
734
735 {
736 let mut txn = storage.txn(client_id).await?;
737 txn.new_client(version_id_2).await?;
739 txn.add_version(version_id_1, NIL_VERSION_ID, vec![])
740 .await?;
741 txn.add_version(version_id_2, version_id_1, vec![]).await?;
742
743 txn.commit().await?;
744 }
745
746 let server = into_server(storage);
748 server
749 .add_snapshot(client_id, version_id_1, vec![1, 2, 3])
750 .await?;
751
752 let mut txn = server.txn(client_id).await?;
754 let client = txn.get_client().await?.unwrap();
755 let snapshot = client.snapshot.unwrap();
756 assert_eq!(snapshot.version_id, version_id_1);
757 assert_eq!(snapshot.versions_since, 0);
758 assert_eq!(
759 txn.get_snapshot_data(version_id_1).await.unwrap(),
760 Some(vec![1, 2, 3])
761 );
762
763 Ok(())
764 }
765
766 #[tokio::test]
767 async fn add_snapshot_fails_no_such() -> anyhow::Result<()> {
768 let (storage, client_id) = setup();
769 let version_id_1 = Uuid::new_v4();
770 let version_id_2 = Uuid::new_v4();
771
772 {
773 let mut txn = storage.txn(client_id).await?;
774 txn.new_client(version_id_2).await?;
776 txn.add_version(version_id_1, NIL_VERSION_ID, vec![])
777 .await?;
778 txn.add_version(version_id_2, version_id_1, vec![]).await?;
779
780 txn.commit().await?;
781 }
782
783 let server = into_server(storage);
785 let version_id_unk = Uuid::new_v4();
786 server
787 .add_snapshot(client_id, version_id_unk, vec![1, 2, 3])
788 .await?;
789
790 let mut txn = server.txn(client_id).await?;
792 let client = txn.get_client().await?.unwrap();
793 assert!(client.snapshot.is_none());
794
795 Ok(())
796 }
797
798 #[tokio::test]
799 async fn add_snapshot_fails_too_old() -> anyhow::Result<()> {
800 let (storage, client_id) = setup();
801 let mut version_id = Uuid::new_v4();
802 let mut parent_version_id = Uuid::nil();
803 let mut version_ids = vec![];
804
805 {
806 let mut txn = storage.txn(client_id).await?;
807 txn.new_client(Uuid::nil()).await?;
809 for _ in 0..10 {
810 txn.add_version(version_id, parent_version_id, vec![])
811 .await?;
812 version_ids.push(version_id);
813 parent_version_id = version_id;
814 version_id = Uuid::new_v4();
815 }
816
817 txn.commit().await?;
818 }
819
820 let server = into_server(storage);
822 server
823 .add_snapshot(client_id, version_ids[0], vec![1, 2, 3])
824 .await?;
825
826 let mut txn = server.txn(client_id).await?;
828 let client = txn.get_client().await?.unwrap();
829 assert!(client.snapshot.is_none());
830
831 Ok(())
832 }
833
834 #[tokio::test]
835 async fn add_snapshot_fails_newer_exists() -> anyhow::Result<()> {
836 let (storage, client_id) = setup();
837 let mut version_id = Uuid::new_v4();
838 let mut parent_version_id = Uuid::nil();
839 let mut version_ids = vec![];
840
841 {
842 let mut txn = storage.txn(client_id).await?;
843 txn.new_client(Uuid::nil()).await?;
846 for _ in 0..5 {
847 txn.add_version(version_id, parent_version_id, vec![])
848 .await?;
849 version_ids.push(version_id);
850 parent_version_id = version_id;
851 version_id = Uuid::new_v4();
852 }
853 txn.set_snapshot(
854 Snapshot {
855 version_id: version_ids[2],
856 versions_since: 2,
857 timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
858 },
859 vec![1, 2, 3],
860 )
861 .await?;
862
863 txn.commit().await?;
864 }
865
866 let server = into_server(storage);
868 server
869 .add_snapshot(client_id, version_ids[0], vec![9, 9, 9])
870 .await?;
871
872 let mut txn = server.txn(client_id).await?;
874 let client = txn.get_client().await?.unwrap();
875 let snapshot = client.snapshot.unwrap();
876 assert_eq!(snapshot.version_id, version_ids[2]);
877 assert_eq!(snapshot.versions_since, 2);
878 assert_eq!(
879 txn.get_snapshot_data(version_ids[2]).await.unwrap(),
880 Some(vec![1, 2, 3])
881 );
882
883 Ok(())
884 }
885
886 #[tokio::test]
887 async fn add_snapshot_fails_nil_version() -> anyhow::Result<()> {
888 let (storage, client_id) = setup();
889 {
890 let mut txn = storage.txn(client_id).await?;
891 txn.new_client(NIL_VERSION_ID).await?;
893 txn.commit().await?;
894 }
895
896 let server = into_server(storage);
897 server
898 .add_snapshot(client_id, NIL_VERSION_ID, vec![9, 9, 9])
899 .await?;
900
901 let mut txn = server.txn(client_id).await?;
903 let client = txn.get_client().await?.unwrap();
904 assert!(client.snapshot.is_none());
905
906 Ok(())
907 }
908
909 #[tokio::test]
910 async fn get_snapshot_found() -> anyhow::Result<()> {
911 let (storage, client_id) = setup();
912 let data = vec![1, 2, 3];
913 let snapshot_version_id = Uuid::new_v4();
914
915 {
916 let mut txn = storage.txn(client_id).await?;
917 txn.new_client(snapshot_version_id).await?;
918 txn.set_snapshot(
919 Snapshot {
920 version_id: snapshot_version_id,
921 versions_since: 3,
922 timestamp: Utc.with_ymd_and_hms(2001, 9, 9, 1, 46, 40).unwrap(),
923 },
924 data.clone(),
925 )
926 .await?;
927 txn.commit().await?;
928 }
929
930 let server = into_server(storage);
931 assert_eq!(
932 server.get_snapshot(client_id).await?,
933 Some((snapshot_version_id, data))
934 );
935
936 Ok(())
937 }
938
939 #[tokio::test]
940 async fn get_snapshot_not_found() -> anyhow::Result<()> {
941 let (storage, client_id) = setup();
942 {
943 let mut txn = storage.txn(client_id).await?;
944 txn.new_client(NIL_VERSION_ID).await?;
945 txn.commit().await?;
946 }
947
948 let server = into_server(storage);
949 assert_eq!(server.get_snapshot(client_id).await?, None);
950
951 Ok(())
952 }
953}