Skip to main content

syncular_runtime/storage/
compaction.rs

1use crate::app_schema::AppTableMetadata;
2use crate::error::{ErrorKind, Result, SyncularError};
3use crate::store::now_ms;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
7#[serde(default, rename_all = "camelCase")]
8pub struct StorageCompactionOptions {
9    pub older_than_ms: Option<i64>,
10    pub max_blob_cache_bytes: Option<i64>,
11    pub prune_acked_outbox: Option<bool>,
12    pub prune_resolved_conflicts: Option<bool>,
13    pub prune_failed_blob_uploads: Option<bool>,
14    pub prune_inactive_subscription_states: Option<bool>,
15    pub prune_tombstones: Option<bool>,
16    pub max_tombstone_server_version: Option<i64>,
17    pub prune_encrypted_crdt_updates: Option<bool>,
18    pub max_encrypted_crdt_checkpoints_per_stream: Option<i64>,
19    pub prune_crdt_update_log: Option<bool>,
20}
21
22#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
23#[serde(rename_all = "camelCase")]
24pub struct StorageCompactionReport {
25    pub acked_outbox_commits_deleted: i64,
26    pub resolved_conflicts_deleted: i64,
27    pub failed_blob_uploads_deleted: i64,
28    pub inactive_subscription_states_deleted: i64,
29    pub tombstone_rows_deleted: i64,
30    pub blob_cache_bytes_pruned: i64,
31    pub encrypted_crdt_updates_deleted: i64,
32    pub encrypted_crdt_checkpoints_deleted: i64,
33    pub crdt_update_log_deleted: i64,
34}
35
36impl StorageCompactionOptions {
37    pub fn from_json(options_json: Option<&str>) -> Result<Self> {
38        match options_json.map(str::trim) {
39            None | Some("") => Ok(Self::default()),
40            Some(value) => serde_json::from_str(value).map_err(SyncularError::protocol),
41        }
42    }
43
44    pub fn cutoff_ms(&self, now: i64) -> Result<Option<i64>> {
45        let Some(age) = self.older_than_ms else {
46            return Ok(None);
47        };
48        if age < 0 {
49            return Err(SyncularError::message(
50                ErrorKind::Config,
51                "storage compaction olderThanMs must be non-negative",
52            ));
53        }
54        Ok(Some(now.saturating_sub(age)))
55    }
56
57    pub fn cutoff_ms_now(&self) -> Result<Option<i64>> {
58        if self.older_than_ms.is_none() {
59            return Ok(None);
60        }
61        self.cutoff_ms(now_ms())
62    }
63
64    pub fn should_prune_acked_outbox(&self) -> bool {
65        self.prune_acked_outbox
66            .unwrap_or(self.older_than_ms.is_some())
67    }
68
69    pub fn should_prune_resolved_conflicts(&self) -> bool {
70        self.prune_resolved_conflicts
71            .unwrap_or(self.older_than_ms.is_some())
72    }
73
74    pub fn should_prune_failed_blob_uploads(&self) -> bool {
75        self.prune_failed_blob_uploads.unwrap_or(false)
76    }
77
78    pub fn should_prune_inactive_subscription_states(&self) -> bool {
79        self.prune_inactive_subscription_states.unwrap_or(false)
80    }
81
82    pub fn should_prune_tombstones(&self) -> bool {
83        self.prune_tombstones
84            .unwrap_or(self.max_tombstone_server_version.is_some())
85    }
86
87    pub fn should_prune_encrypted_crdt_updates(&self) -> bool {
88        self.prune_encrypted_crdt_updates.unwrap_or(false)
89    }
90
91    pub fn should_prune_crdt_update_log(&self) -> bool {
92        self.prune_crdt_update_log
93            .unwrap_or(self.older_than_ms.is_some())
94    }
95
96    pub fn encrypted_crdt_checkpoint_keep_count(&self) -> Result<Option<i64>> {
97        let Some(count) = self.max_encrypted_crdt_checkpoints_per_stream else {
98            return Ok(None);
99        };
100        if count < 1 {
101            return Err(SyncularError::message(
102                ErrorKind::Config,
103                "storage compaction maxEncryptedCrdtCheckpointsPerStream must be at least 1",
104            ));
105        }
106        Ok(Some(count))
107    }
108}
109
110pub fn tombstone_delete_statements(
111    metadata: &[AppTableMetadata],
112    max_server_version: i64,
113) -> Result<Vec<String>> {
114    metadata
115        .iter()
116        .filter_map(|metadata| {
117            metadata
118                .soft_delete_column
119                .map(|soft_delete_column| (metadata, soft_delete_column))
120        })
121        .map(|(metadata, soft_delete_column)| {
122            tombstone_delete_statement(metadata, soft_delete_column, max_server_version)
123        })
124        .collect()
125}
126
127pub fn tombstone_table_names(metadata: &[AppTableMetadata]) -> Vec<String> {
128    metadata
129        .iter()
130        .filter(|metadata| metadata.soft_delete_column.is_some())
131        .map(|metadata| metadata.name.to_string())
132        .collect()
133}
134
135pub fn required_compaction_cutoff(cutoff: Option<i64>, label: &str) -> Result<i64> {
136    cutoff.ok_or_else(|| {
137        SyncularError::config(format!(
138            "storage compaction for {label} requires olderThanMs"
139        ))
140    })
141}
142
143fn tombstone_delete_statement(
144    metadata: &AppTableMetadata,
145    soft_delete_column: &str,
146    max_server_version: i64,
147) -> Result<String> {
148    validate_sqlite_identifier(metadata.name)?;
149    validate_sqlite_identifier(soft_delete_column)?;
150    validate_sqlite_identifier(metadata.server_version_column)?;
151    Ok(format!(
152        "delete from {table} where {soft_delete_column} != 0 and {server_version_column} <= {max_server_version}",
153        table = metadata.name,
154        server_version_column = metadata.server_version_column,
155    ))
156}
157
158fn validate_sqlite_identifier(identifier: &str) -> Result<()> {
159    if identifier
160        .bytes()
161        .all(|byte| byte.is_ascii_alphanumeric() || byte == b'_')
162    {
163        Ok(())
164    } else {
165        Err(SyncularError::schema(format!(
166            "invalid sqlite identifier in storage compaction: {identifier}"
167        )))
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn age_cutoff_enables_safe_default_cleanup() -> Result<()> {
177        let options = StorageCompactionOptions {
178            older_than_ms: Some(1_000),
179            ..StorageCompactionOptions::default()
180        };
181
182        assert_eq!(options.cutoff_ms(10_000)?, Some(9_000));
183        assert!(options.should_prune_acked_outbox());
184        assert!(options.should_prune_resolved_conflicts());
185        assert!(!options.should_prune_failed_blob_uploads());
186        assert!(!options.should_prune_inactive_subscription_states());
187        assert!(!options.should_prune_tombstones());
188        assert!(!options.should_prune_encrypted_crdt_updates());
189        Ok(())
190    }
191
192    #[test]
193    fn tombstone_cleanup_requires_server_version_bound() {
194        let options = StorageCompactionOptions {
195            max_tombstone_server_version: Some(42),
196            ..StorageCompactionOptions::default()
197        };
198
199        assert!(options.should_prune_tombstones());
200        let statements =
201            tombstone_delete_statements(crate::fixtures::todo::generated::APP_TABLE_METADATA, 42)
202                .expect("statements");
203        assert!(statements
204            .iter()
205            .any(|statement| statement.contains("delete from comments")));
206        assert!(statements
207            .iter()
208            .all(|statement| statement.contains("server_version <= 42")));
209    }
210}