use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use crate::error::PlatformError;
use crate::traits::Storage;
const SYNC_PREFIX: &str = "_sync/";
const LOG_PREFIX: &str = "_sync/log/";
const NEXT_SEQ_KEY: &str = "_sync/meta/next_seq";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ChangeEntry {
pub seq: u64,
pub key: String,
pub value: Option<Vec<u8>>,
}
pub type Changeset = Vec<ChangeEntry>;
pub struct SyncableStorage<S: Storage> {
inner: S,
seq_lock: Mutex<()>,
}
impl<S: Storage> SyncableStorage<S> {
#[must_use]
pub fn new(inner: S) -> Self {
Self {
inner,
seq_lock: Mutex::new(()),
}
}
pub async fn next_seq(&self) -> Result<u64, PlatformError> {
self.inner
.retrieve(NEXT_SEQ_KEY)
.await?
.map_or(Ok(0), |bytes| {
rmp_serde::from_slice(&bytes).map_err(|e| {
PlatformError::StorageError(format!("failed to deserialize next_seq: {e}"))
})
})
}
pub async fn export_changeset(&self, since: u64) -> Result<Changeset, PlatformError> {
let all_log_keys = self.inner.list_keys(LOG_PREFIX).await?;
let since_key = log_key(since);
let mut entries = Vec::new();
for key in all_log_keys {
if key >= since_key
&& let Some(bytes) = self.inner.retrieve(&key).await?
{
let entry: ChangeEntry = rmp_serde::from_slice(&bytes).map_err(|e| {
PlatformError::StorageError(format!(
"failed to deserialize changelog entry {key}: {e}"
))
})?;
entries.push(entry);
}
}
Ok(entries)
}
const SYNC_DENIED_PREFIXES: &'static [&'static str] =
&["identity/", "mls/", "_meta/", "_sync/"];
pub async fn apply_changeset(&self, changeset: Changeset) -> Result<(), PlatformError> {
for entry in &changeset {
if Self::SYNC_DENIED_PREFIXES
.iter()
.any(|p| entry.key.starts_with(p))
{
return Err(PlatformError::StorageError(format!(
"changeset key '{}' targets a protected namespace",
entry.key
)));
}
}
for entry in changeset {
match &entry.value {
Some(data) => {
self.store(&entry.key, data).await?;
}
None => {
self.delete(&entry.key).await?;
}
}
}
Ok(())
}
async fn allocate_seq(&self) -> Result<u64, PlatformError> {
let seq = self.next_seq().await?;
let next = seq + 1;
let encoded = rmp_serde::to_vec(&next).map_err(|e| {
PlatformError::StorageError(format!("failed to serialize next_seq: {e}"))
})?;
self.inner.store(NEXT_SEQ_KEY, &encoded).await?;
Ok(seq)
}
async fn append_log(&self, entry: &ChangeEntry) -> Result<(), PlatformError> {
let key = log_key(entry.seq);
let encoded = rmp_serde::to_vec(entry).map_err(|e| {
PlatformError::StorageError(format!("failed to serialize changelog entry: {e}"))
})?;
self.inner.store(&key, &encoded).await
}
}
fn log_key(seq: u64) -> String {
format!("{LOG_PREFIX}{seq:020}")
}
#[allow(clippy::manual_async_fn)]
impl<S: Storage> Storage for SyncableStorage<S> {
fn store(
&self,
key: &str,
data: &[u8],
) -> impl Future<Output = Result<(), PlatformError>> + Send {
let key = key.to_owned();
let data = data.to_vec();
async move {
if key.starts_with(SYNC_PREFIX) {
return self.inner.store(&key, &data).await;
}
let _guard = self.seq_lock.lock().await;
self.inner.store(&key, &data).await?;
let seq = self.allocate_seq().await?;
let entry = ChangeEntry {
seq,
key,
value: Some(data),
};
self.append_log(&entry).await
}
}
fn retrieve(
&self,
key: &str,
) -> impl Future<Output = Result<Option<Vec<u8>>, PlatformError>> + Send {
let key = key.to_owned();
async move { self.inner.retrieve(&key).await }
}
fn delete(&self, key: &str) -> impl Future<Output = Result<(), PlatformError>> + Send {
let key = key.to_owned();
async move {
if key.starts_with(SYNC_PREFIX) {
return self.inner.delete(&key).await;
}
let _guard = self.seq_lock.lock().await;
self.inner.delete(&key).await?;
let seq = self.allocate_seq().await?;
let entry = ChangeEntry {
seq,
key,
value: None,
};
self.append_log(&entry).await
}
}
fn list_keys(
&self,
prefix: &str,
) -> impl Future<Output = Result<Vec<String>, PlatformError>> + Send {
let prefix = prefix.to_owned();
async move {
let keys = self.inner.list_keys(&prefix).await?;
if prefix.starts_with(SYNC_PREFIX) {
Ok(keys)
} else {
Ok(keys
.into_iter()
.filter(|k| !k.starts_with(SYNC_PREFIX))
.collect())
}
}
}
fn delete_prefix(
&self,
prefix: &str,
) -> impl Future<Output = Result<u64, PlatformError>> + Send {
let prefix = prefix.to_owned();
async move {
if prefix.starts_with(SYNC_PREFIX) {
return self.inner.delete_prefix(&prefix).await;
}
let keys = self.inner.list_keys(&prefix).await?;
let mut count: u64 = 0;
for key in keys {
if !key.starts_with(SYNC_PREFIX) {
self.delete(&key).await?;
count += 1;
}
}
Ok(count)
}
}
fn exists(&self, key: &str) -> impl Future<Output = Result<bool, PlatformError>> + Send {
let key = key.to_owned();
async move { self.inner.exists(&key).await }
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::testing::InMemoryStorage;
extern crate self as scp_platform;
scp_testing::storage_conformance!(SyncableStorage::new(InMemoryStorage::new()));
#[tokio::test]
async fn export_after_mutations() {
let storage = SyncableStorage::new(InMemoryStorage::new());
storage.store("a", b"1").await.unwrap();
storage.store("b", b"2").await.unwrap();
storage.store("c", b"3").await.unwrap();
let changeset = storage.export_changeset(0).await.unwrap();
assert_eq!(changeset.len(), 3);
assert_eq!(changeset[0].seq, 0);
assert_eq!(changeset[0].key, "a");
assert_eq!(changeset[0].value, Some(b"1".to_vec()));
assert_eq!(changeset[1].seq, 1);
assert_eq!(changeset[1].key, "b");
assert_eq!(changeset[1].value, Some(b"2".to_vec()));
assert_eq!(changeset[2].seq, 2);
assert_eq!(changeset[2].key, "c");
assert_eq!(changeset[2].value, Some(b"3".to_vec()));
}
#[tokio::test]
async fn export_since_filters() {
let storage = SyncableStorage::new(InMemoryStorage::new());
storage.store("a", b"1").await.unwrap();
storage.store("b", b"2").await.unwrap();
storage.store("c", b"3").await.unwrap();
let changeset = storage.export_changeset(2).await.unwrap();
assert_eq!(changeset.len(), 1);
assert_eq!(changeset[0].seq, 2);
assert_eq!(changeset[0].key, "c");
}
#[tokio::test]
async fn apply_changeset_roundtrip() {
let storage_a = SyncableStorage::new(InMemoryStorage::new());
storage_a.store("key1", b"val1").await.unwrap();
storage_a.store("key2", b"val2").await.unwrap();
storage_a.delete("key1").await.unwrap();
let changeset = storage_a.export_changeset(0).await.unwrap();
let storage_b = SyncableStorage::new(InMemoryStorage::new());
storage_b.apply_changeset(changeset).await.unwrap();
assert_eq!(storage_b.retrieve("key1").await.unwrap(), None);
assert_eq!(
storage_b.retrieve("key2").await.unwrap(),
Some(b"val2".to_vec())
);
}
#[tokio::test]
async fn apply_changeset_lww() {
let storage_a = SyncableStorage::new(InMemoryStorage::new());
storage_a.store("key", b"first").await.unwrap();
storage_a.store("key", b"second").await.unwrap();
storage_a.store("key", b"third").await.unwrap();
let changeset = storage_a.export_changeset(0).await.unwrap();
let storage_b = SyncableStorage::new(InMemoryStorage::new());
storage_b.apply_changeset(changeset).await.unwrap();
assert_eq!(
storage_b.retrieve("key").await.unwrap(),
Some(b"third".to_vec())
);
}
#[tokio::test]
async fn next_seq_starts_at_zero() {
let storage = SyncableStorage::new(InMemoryStorage::new());
assert_eq!(storage.next_seq().await.unwrap(), 0);
}
#[tokio::test]
async fn next_seq_increments() {
let storage = SyncableStorage::new(InMemoryStorage::new());
storage.store("a", b"1").await.unwrap();
assert_eq!(storage.next_seq().await.unwrap(), 1);
storage.store("b", b"2").await.unwrap();
assert_eq!(storage.next_seq().await.unwrap(), 2);
}
#[tokio::test]
async fn delete_logged_in_changelog() {
let storage = SyncableStorage::new(InMemoryStorage::new());
storage.store("key", b"value").await.unwrap();
storage.delete("key").await.unwrap();
let changeset = storage.export_changeset(0).await.unwrap();
assert_eq!(changeset.len(), 2);
assert_eq!(changeset[0].value, Some(b"value".to_vec()));
assert_eq!(changeset[1].key, "key");
assert_eq!(changeset[1].value, None);
}
#[tokio::test]
async fn sync_keys_not_in_user_changelog() {
let storage = SyncableStorage::new(InMemoryStorage::new());
storage.store("user_key", b"data").await.unwrap();
let changeset = storage.export_changeset(0).await.unwrap();
assert_eq!(changeset.len(), 1);
assert_eq!(changeset[0].key, "user_key");
}
}