syncular-runtime 0.1.0

Shared Rust runtime for Syncular SQLite-backed native and browser clients.
Documentation
use crate::app_schema::AppTableMetadata;
use crate::error::{ErrorKind, Result, SyncularError};
use crate::store::now_ms;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default, rename_all = "camelCase")]
pub struct StorageCompactionOptions {
    pub older_than_ms: Option<i64>,
    pub max_blob_cache_bytes: Option<i64>,
    pub prune_acked_outbox: Option<bool>,
    pub prune_resolved_conflicts: Option<bool>,
    pub prune_failed_blob_uploads: Option<bool>,
    pub prune_inactive_subscription_states: Option<bool>,
    pub prune_tombstones: Option<bool>,
    pub max_tombstone_server_version: Option<i64>,
    pub prune_encrypted_crdt_updates: Option<bool>,
    pub max_encrypted_crdt_checkpoints_per_stream: Option<i64>,
    pub prune_crdt_update_log: Option<bool>,
}

#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct StorageCompactionReport {
    pub acked_outbox_commits_deleted: i64,
    pub resolved_conflicts_deleted: i64,
    pub failed_blob_uploads_deleted: i64,
    pub inactive_subscription_states_deleted: i64,
    pub tombstone_rows_deleted: i64,
    pub blob_cache_bytes_pruned: i64,
    pub encrypted_crdt_updates_deleted: i64,
    pub encrypted_crdt_checkpoints_deleted: i64,
    pub crdt_update_log_deleted: i64,
}

impl StorageCompactionOptions {
    pub fn from_json(options_json: Option<&str>) -> Result<Self> {
        match options_json.map(str::trim) {
            None | Some("") => Ok(Self::default()),
            Some(value) => serde_json::from_str(value).map_err(SyncularError::protocol),
        }
    }

    pub fn cutoff_ms(&self, now: i64) -> Result<Option<i64>> {
        let Some(age) = self.older_than_ms else {
            return Ok(None);
        };
        if age < 0 {
            return Err(SyncularError::message(
                ErrorKind::Config,
                "storage compaction olderThanMs must be non-negative",
            ));
        }
        Ok(Some(now.saturating_sub(age)))
    }

    pub fn cutoff_ms_now(&self) -> Result<Option<i64>> {
        if self.older_than_ms.is_none() {
            return Ok(None);
        }
        self.cutoff_ms(now_ms())
    }

    pub fn should_prune_acked_outbox(&self) -> bool {
        self.prune_acked_outbox
            .unwrap_or(self.older_than_ms.is_some())
    }

    pub fn should_prune_resolved_conflicts(&self) -> bool {
        self.prune_resolved_conflicts
            .unwrap_or(self.older_than_ms.is_some())
    }

    pub fn should_prune_failed_blob_uploads(&self) -> bool {
        self.prune_failed_blob_uploads.unwrap_or(false)
    }

    pub fn should_prune_inactive_subscription_states(&self) -> bool {
        self.prune_inactive_subscription_states.unwrap_or(false)
    }

    pub fn should_prune_tombstones(&self) -> bool {
        self.prune_tombstones
            .unwrap_or(self.max_tombstone_server_version.is_some())
    }

    pub fn should_prune_encrypted_crdt_updates(&self) -> bool {
        self.prune_encrypted_crdt_updates.unwrap_or(false)
    }

    pub fn should_prune_crdt_update_log(&self) -> bool {
        self.prune_crdt_update_log
            .unwrap_or(self.older_than_ms.is_some())
    }

    pub fn encrypted_crdt_checkpoint_keep_count(&self) -> Result<Option<i64>> {
        let Some(count) = self.max_encrypted_crdt_checkpoints_per_stream else {
            return Ok(None);
        };
        if count < 1 {
            return Err(SyncularError::message(
                ErrorKind::Config,
                "storage compaction maxEncryptedCrdtCheckpointsPerStream must be at least 1",
            ));
        }
        Ok(Some(count))
    }
}

pub fn tombstone_delete_statements(
    metadata: &[AppTableMetadata],
    max_server_version: i64,
) -> Result<Vec<String>> {
    metadata
        .iter()
        .filter_map(|metadata| {
            metadata
                .soft_delete_column
                .map(|soft_delete_column| (metadata, soft_delete_column))
        })
        .map(|(metadata, soft_delete_column)| {
            tombstone_delete_statement(metadata, soft_delete_column, max_server_version)
        })
        .collect()
}

pub fn tombstone_table_names(metadata: &[AppTableMetadata]) -> Vec<String> {
    metadata
        .iter()
        .filter(|metadata| metadata.soft_delete_column.is_some())
        .map(|metadata| metadata.name.to_string())
        .collect()
}

pub fn required_compaction_cutoff(cutoff: Option<i64>, label: &str) -> Result<i64> {
    cutoff.ok_or_else(|| {
        SyncularError::config(format!(
            "storage compaction for {label} requires olderThanMs"
        ))
    })
}

fn tombstone_delete_statement(
    metadata: &AppTableMetadata,
    soft_delete_column: &str,
    max_server_version: i64,
) -> Result<String> {
    validate_sqlite_identifier(metadata.name)?;
    validate_sqlite_identifier(soft_delete_column)?;
    validate_sqlite_identifier(metadata.server_version_column)?;
    Ok(format!(
        "delete from {table} where {soft_delete_column} != 0 and {server_version_column} <= {max_server_version}",
        table = metadata.name,
        server_version_column = metadata.server_version_column,
    ))
}

fn validate_sqlite_identifier(identifier: &str) -> Result<()> {
    if identifier
        .bytes()
        .all(|byte| byte.is_ascii_alphanumeric() || byte == b'_')
    {
        Ok(())
    } else {
        Err(SyncularError::schema(format!(
            "invalid sqlite identifier in storage compaction: {identifier}"
        )))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn age_cutoff_enables_safe_default_cleanup() -> Result<()> {
        let options = StorageCompactionOptions {
            older_than_ms: Some(1_000),
            ..StorageCompactionOptions::default()
        };

        assert_eq!(options.cutoff_ms(10_000)?, Some(9_000));
        assert!(options.should_prune_acked_outbox());
        assert!(options.should_prune_resolved_conflicts());
        assert!(!options.should_prune_failed_blob_uploads());
        assert!(!options.should_prune_inactive_subscription_states());
        assert!(!options.should_prune_tombstones());
        assert!(!options.should_prune_encrypted_crdt_updates());
        Ok(())
    }

    #[test]
    fn tombstone_cleanup_requires_server_version_bound() {
        let options = StorageCompactionOptions {
            max_tombstone_server_version: Some(42),
            ..StorageCompactionOptions::default()
        };

        assert!(options.should_prune_tombstones());
        let statements =
            tombstone_delete_statements(crate::fixtures::todo::generated::APP_TABLE_METADATA, 42)
                .expect("statements");
        assert!(statements
            .iter()
            .any(|statement| statement.contains("delete from comments")));
        assert!(statements
            .iter()
            .all(|statement| statement.contains("server_version <= 42")));
    }
}