syncular_runtime/storage/
compaction.rs1use crate::app_schema::AppTableMetadata;
2use crate::error::{ErrorKind, Result, SyncularError};
3use crate::store::now_ms;
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
7#[serde(default, rename_all = "camelCase")]
8pub struct StorageCompactionOptions {
9 pub older_than_ms: Option<i64>,
10 pub max_blob_cache_bytes: Option<i64>,
11 pub prune_acked_outbox: Option<bool>,
12 pub prune_resolved_conflicts: Option<bool>,
13 pub prune_failed_blob_uploads: Option<bool>,
14 pub prune_inactive_subscription_states: Option<bool>,
15 pub prune_tombstones: Option<bool>,
16 pub max_tombstone_server_version: Option<i64>,
17 pub prune_encrypted_crdt_updates: Option<bool>,
18 pub max_encrypted_crdt_checkpoints_per_stream: Option<i64>,
19 pub prune_crdt_update_log: Option<bool>,
20}
21
22#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
23#[serde(rename_all = "camelCase")]
24pub struct StorageCompactionReport {
25 pub acked_outbox_commits_deleted: i64,
26 pub resolved_conflicts_deleted: i64,
27 pub failed_blob_uploads_deleted: i64,
28 pub inactive_subscription_states_deleted: i64,
29 pub tombstone_rows_deleted: i64,
30 pub blob_cache_bytes_pruned: i64,
31 pub encrypted_crdt_updates_deleted: i64,
32 pub encrypted_crdt_checkpoints_deleted: i64,
33 pub crdt_update_log_deleted: i64,
34}
35
36impl StorageCompactionOptions {
37 pub fn from_json(options_json: Option<&str>) -> Result<Self> {
38 match options_json.map(str::trim) {
39 None | Some("") => Ok(Self::default()),
40 Some(value) => serde_json::from_str(value).map_err(SyncularError::protocol),
41 }
42 }
43
44 pub fn cutoff_ms(&self, now: i64) -> Result<Option<i64>> {
45 let Some(age) = self.older_than_ms else {
46 return Ok(None);
47 };
48 if age < 0 {
49 return Err(SyncularError::message(
50 ErrorKind::Config,
51 "storage compaction olderThanMs must be non-negative",
52 ));
53 }
54 Ok(Some(now.saturating_sub(age)))
55 }
56
57 pub fn cutoff_ms_now(&self) -> Result<Option<i64>> {
58 if self.older_than_ms.is_none() {
59 return Ok(None);
60 }
61 self.cutoff_ms(now_ms())
62 }
63
64 pub fn should_prune_acked_outbox(&self) -> bool {
65 self.prune_acked_outbox
66 .unwrap_or(self.older_than_ms.is_some())
67 }
68
69 pub fn should_prune_resolved_conflicts(&self) -> bool {
70 self.prune_resolved_conflicts
71 .unwrap_or(self.older_than_ms.is_some())
72 }
73
74 pub fn should_prune_failed_blob_uploads(&self) -> bool {
75 self.prune_failed_blob_uploads.unwrap_or(false)
76 }
77
78 pub fn should_prune_inactive_subscription_states(&self) -> bool {
79 self.prune_inactive_subscription_states.unwrap_or(false)
80 }
81
82 pub fn should_prune_tombstones(&self) -> bool {
83 self.prune_tombstones
84 .unwrap_or(self.max_tombstone_server_version.is_some())
85 }
86
87 pub fn should_prune_encrypted_crdt_updates(&self) -> bool {
88 self.prune_encrypted_crdt_updates.unwrap_or(false)
89 }
90
91 pub fn should_prune_crdt_update_log(&self) -> bool {
92 self.prune_crdt_update_log
93 .unwrap_or(self.older_than_ms.is_some())
94 }
95
96 pub fn encrypted_crdt_checkpoint_keep_count(&self) -> Result<Option<i64>> {
97 let Some(count) = self.max_encrypted_crdt_checkpoints_per_stream else {
98 return Ok(None);
99 };
100 if count < 1 {
101 return Err(SyncularError::message(
102 ErrorKind::Config,
103 "storage compaction maxEncryptedCrdtCheckpointsPerStream must be at least 1",
104 ));
105 }
106 Ok(Some(count))
107 }
108}
109
110pub fn tombstone_delete_statements(
111 metadata: &[AppTableMetadata],
112 max_server_version: i64,
113) -> Result<Vec<String>> {
114 metadata
115 .iter()
116 .filter_map(|metadata| {
117 metadata
118 .soft_delete_column
119 .map(|soft_delete_column| (metadata, soft_delete_column))
120 })
121 .map(|(metadata, soft_delete_column)| {
122 tombstone_delete_statement(metadata, soft_delete_column, max_server_version)
123 })
124 .collect()
125}
126
127pub fn tombstone_table_names(metadata: &[AppTableMetadata]) -> Vec<String> {
128 metadata
129 .iter()
130 .filter(|metadata| metadata.soft_delete_column.is_some())
131 .map(|metadata| metadata.name.to_string())
132 .collect()
133}
134
135pub fn required_compaction_cutoff(cutoff: Option<i64>, label: &str) -> Result<i64> {
136 cutoff.ok_or_else(|| {
137 SyncularError::config(format!(
138 "storage compaction for {label} requires olderThanMs"
139 ))
140 })
141}
142
143fn tombstone_delete_statement(
144 metadata: &AppTableMetadata,
145 soft_delete_column: &str,
146 max_server_version: i64,
147) -> Result<String> {
148 validate_sqlite_identifier(metadata.name)?;
149 validate_sqlite_identifier(soft_delete_column)?;
150 validate_sqlite_identifier(metadata.server_version_column)?;
151 Ok(format!(
152 "delete from {table} where {soft_delete_column} != 0 and {server_version_column} <= {max_server_version}",
153 table = metadata.name,
154 server_version_column = metadata.server_version_column,
155 ))
156}
157
158fn validate_sqlite_identifier(identifier: &str) -> Result<()> {
159 if identifier
160 .bytes()
161 .all(|byte| byte.is_ascii_alphanumeric() || byte == b'_')
162 {
163 Ok(())
164 } else {
165 Err(SyncularError::schema(format!(
166 "invalid sqlite identifier in storage compaction: {identifier}"
167 )))
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn age_cutoff_enables_safe_default_cleanup() -> Result<()> {
177 let options = StorageCompactionOptions {
178 older_than_ms: Some(1_000),
179 ..StorageCompactionOptions::default()
180 };
181
182 assert_eq!(options.cutoff_ms(10_000)?, Some(9_000));
183 assert!(options.should_prune_acked_outbox());
184 assert!(options.should_prune_resolved_conflicts());
185 assert!(!options.should_prune_failed_blob_uploads());
186 assert!(!options.should_prune_inactive_subscription_states());
187 assert!(!options.should_prune_tombstones());
188 assert!(!options.should_prune_encrypted_crdt_updates());
189 Ok(())
190 }
191
192 #[test]
193 fn tombstone_cleanup_requires_server_version_bound() {
194 let options = StorageCompactionOptions {
195 max_tombstone_server_version: Some(42),
196 ..StorageCompactionOptions::default()
197 };
198
199 assert!(options.should_prune_tombstones());
200 let statements =
201 tombstone_delete_statements(crate::fixtures::todo::generated::APP_TABLE_METADATA, 42)
202 .expect("statements");
203 assert!(statements
204 .iter()
205 .any(|statement| statement.contains("delete from comments")));
206 assert!(statements
207 .iter()
208 .all(|statement| statement.contains("server_version <= 42")));
209 }
210}