Skip to main content

syncular_runtime/core/
crdt_field.rs

1use crate::app_schema::{AppSchema, AppTableMetadata, ColumnMetadata, CrdtYjsFieldMetadata};
2use crate::crdt_yjs::{YjsFieldKind, YjsFieldRule};
3use crate::encrypted_crdt::is_encrypted_update_log_field;
4use crate::error::{Result, SyncularError};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "camelCase")]
9pub struct CrdtFieldId {
10    pub table: String,
11    pub row_id: String,
12    pub field: String,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "kebab-case")]
17pub enum CrdtFieldSyncMode {
18    ServerMerge,
19    EncryptedUpdateLog,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "kebab-case")]
24pub enum CrdtUpdateOrigin {
25    Local,
26    Remote,
27    Compaction,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "kebab-case")]
32pub enum CrdtUpdateStatus {
33    Pending,
34    Flushed,
35    Acked,
36    Pruned,
37}
38
39#[derive(Debug, Clone)]
40pub struct CrdtField {
41    id: CrdtFieldId,
42    metadata: &'static AppTableMetadata,
43    field: &'static CrdtYjsFieldMetadata,
44    sync_mode: CrdtFieldSyncMode,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct CrdtDocumentSnapshot {
50    pub document_key: String,
51    pub table: String,
52    pub row_id: String,
53    pub field: String,
54    pub state_column: String,
55    pub sync_mode: CrdtFieldSyncMode,
56    pub state_base64: Option<String>,
57    pub state_vector_base64: String,
58    pub pending_updates: i64,
59    pub flushed_updates: i64,
60    pub acked_updates: i64,
61    pub log_updates: i64,
62    pub updated_at: i64,
63    pub compacted_at: Option<i64>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "camelCase")]
68pub struct CrdtFieldCompactionStats {
69    pub pending_updates: i64,
70    pub flushed_updates: i64,
71    pub acked_updates: i64,
72    pub log_updates: i64,
73    pub state_vector_base64: String,
74    pub updated_at: i64,
75    pub compacted_at: Option<i64>,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "camelCase")]
80pub struct CrdtUpdateLogEntry {
81    pub id: i64,
82    pub document_key: String,
83    pub update_id: String,
84    pub client_commit_id: Option<String>,
85    pub origin: CrdtUpdateOrigin,
86    pub status: CrdtUpdateStatus,
87    pub update_base64: String,
88    pub state_vector_base64: String,
89    pub created_at: i64,
90    pub flushed_at: Option<i64>,
91    pub acked_at: Option<i64>,
92}
93
94impl From<&CrdtDocumentSnapshot> for CrdtFieldCompactionStats {
95    fn from(snapshot: &CrdtDocumentSnapshot) -> Self {
96        Self {
97            pending_updates: snapshot.pending_updates,
98            flushed_updates: snapshot.flushed_updates,
99            acked_updates: snapshot.acked_updates,
100            log_updates: snapshot.log_updates,
101            state_vector_base64: snapshot.state_vector_base64.clone(),
102            updated_at: snapshot.updated_at,
103            compacted_at: snapshot.compacted_at,
104        }
105    }
106}
107
108impl CrdtFieldId {
109    pub fn new(
110        table: impl Into<String>,
111        row_id: impl Into<String>,
112        field: impl Into<String>,
113    ) -> Self {
114        Self {
115            table: table.into(),
116            row_id: row_id.into(),
117            field: field.into(),
118        }
119    }
120}
121
122impl CrdtField {
123    pub fn id(&self) -> &CrdtFieldId {
124        &self.id
125    }
126
127    pub fn table(&self) -> &'static str {
128        self.metadata.name
129    }
130
131    pub fn row_id(&self) -> &str {
132        &self.id.row_id
133    }
134
135    pub fn field(&self) -> &'static str {
136        self.field.field
137    }
138
139    pub fn state_column(&self) -> &'static str {
140        self.field.state_column
141    }
142
143    pub fn container_key(&self) -> &'static str {
144        self.field.container_key
145    }
146
147    pub fn row_id_field(&self) -> &'static str {
148        self.field.row_id_field
149    }
150
151    pub fn sync_mode(&self) -> CrdtFieldSyncMode {
152        self.sync_mode
153    }
154
155    pub fn metadata(&self) -> &'static AppTableMetadata {
156        self.metadata
157    }
158
159    pub fn field_metadata(&self) -> &'static CrdtYjsFieldMetadata {
160        self.field
161    }
162
163    pub fn yjs_rule(&self) -> Result<YjsFieldRule> {
164        Ok(YjsFieldRule {
165            table: self.metadata.name.to_string(),
166            field: self.field.field.to_string(),
167            state_column: self.field.state_column.to_string(),
168            container_key: Some(self.field.container_key.to_string()),
169            row_id_field: Some(self.field.row_id_field.to_string()),
170            kind: YjsFieldKind::from_metadata(self.field.kind)?,
171        })
172    }
173
174    pub fn document_key(&self) -> String {
175        crdt_document_key(self.table(), self.row_id(), self.field())
176    }
177}
178
179pub fn crdt_document_key(table: &str, row_id: &str, field: &str) -> String {
180    format!("{table}\u{1f}{row_id}\u{1f}{field}")
181}
182
183pub fn validate_crdt_field(app_schema: AppSchema, id: &CrdtFieldId) -> Result<CrdtField> {
184    let metadata = app_schema
185        .table_metadata(&id.table)
186        .ok_or_else(|| SyncularError::config(format!("unknown app table: {}", id.table)))?;
187    let field = metadata
188        .crdt_yjs_fields
189        .iter()
190        .find(|field| field.field == id.field)
191        .ok_or_else(|| {
192            SyncularError::config(format!(
193                "no CRDT Yjs field metadata for {}.{}",
194                id.table, id.field
195            ))
196        })?;
197    validate_crdt_field_metadata(metadata, field)?;
198    Ok(CrdtField {
199        id: id.clone(),
200        metadata,
201        field,
202        sync_mode: sync_mode_from_metadata(field)?,
203    })
204}
205
206fn validate_crdt_field_metadata(
207    metadata: &AppTableMetadata,
208    field: &CrdtYjsFieldMetadata,
209) -> Result<()> {
210    if metadata.name.trim().is_empty() {
211        return Err(SyncularError::config(
212            "CRDT field metadata cannot reference an empty table name",
213        ));
214    }
215    let primary_key = metadata_column(metadata, metadata.primary_key_column, "primaryKeyColumn")?;
216    if !primary_key.primary_key {
217        return Err(SyncularError::config(format!(
218            "CRDT field metadata for {} expects primary key column {} to be marked primary",
219            metadata.name, metadata.primary_key_column
220        )));
221    }
222    metadata_column(
223        metadata,
224        metadata.server_version_column,
225        "serverVersionColumn",
226    )?;
227    if field.field.trim().is_empty() {
228        return Err(SyncularError::config(format!(
229            "CRDT field metadata for {} has an empty field name",
230            metadata.name
231        )));
232    }
233    if field.state_column.trim().is_empty() {
234        return Err(SyncularError::config(format!(
235            "CRDT field metadata for {}.{} has an empty state column",
236            metadata.name, field.field
237        )));
238    }
239    if field.container_key.trim().is_empty() {
240        return Err(SyncularError::config(format!(
241            "CRDT field metadata for {}.{} has an empty container key",
242            metadata.name, field.field
243        )));
244    }
245    if field.row_id_field.trim().is_empty() {
246        return Err(SyncularError::config(format!(
247            "CRDT field metadata for {}.{} has an empty row id field",
248            metadata.name, field.field
249        )));
250    }
251    YjsFieldKind::from_metadata(field.kind)?;
252    sync_mode_from_metadata(field)?;
253    let value_column = metadata_column(metadata, field.field, "CRDT field")?;
254    if value_column.type_family != "text" {
255        return Err(SyncularError::config(format!(
256            "CRDT field metadata for {}.{} must use a text column, got {}",
257            metadata.name, field.field, value_column.type_family
258        )));
259    }
260    if value_column.primary_key {
261        return Err(SyncularError::config(format!(
262            "CRDT field metadata for {}.{} cannot use the primary key column",
263            metadata.name, field.field
264        )));
265    }
266    if field.field == metadata.server_version_column {
267        return Err(SyncularError::config(format!(
268            "CRDT field metadata for {}.{} cannot use the server version column",
269            metadata.name, field.field
270        )));
271    }
272    if metadata
273        .soft_delete_column
274        .is_some_and(|soft_delete_column| field.field == soft_delete_column)
275    {
276        return Err(SyncularError::config(format!(
277            "CRDT field metadata for {}.{} cannot use the soft delete column",
278            metadata.name, field.field
279        )));
280    }
281    let state_column = metadata_column(metadata, field.state_column, "CRDT stateColumn")?;
282    if state_column.type_family != "text" {
283        return Err(SyncularError::config(format!(
284            "CRDT field metadata for {}.{} state column {} must use a text column, got {}",
285            metadata.name, field.field, field.state_column, state_column.type_family
286        )));
287    }
288    if field.state_column == field.field {
289        return Err(SyncularError::config(format!(
290            "CRDT field metadata for {}.{} cannot use the same field and state column",
291            metadata.name, field.field
292        )));
293    }
294    if field.state_column == metadata.server_version_column {
295        return Err(SyncularError::config(format!(
296            "CRDT field metadata for {}.{} cannot use the server version column as state column",
297            metadata.name, field.field
298        )));
299    }
300    if field.row_id_field != metadata.primary_key_column {
301        return Err(SyncularError::config(format!(
302            "CRDT field metadata for {}.{} uses row id field {}, expected primary key {}",
303            metadata.name, field.field, field.row_id_field, metadata.primary_key_column
304        )));
305    }
306    metadata_column(metadata, field.row_id_field, "CRDT rowIdField")?;
307    for scope in metadata.scopes {
308        if scope.name.trim().is_empty() {
309            return Err(SyncularError::config(format!(
310                "CRDT field metadata for {}.{} has an empty scope name",
311                metadata.name, field.field
312            )));
313        }
314        metadata_column(metadata, scope.column, "scope column")?;
315    }
316    if let Some(encrypted_field) = metadata.encrypted_fields.iter().find(|encrypted_field| {
317        encrypted_field.field == field.field || encrypted_field.field == field.state_column
318    }) {
319        return Err(SyncularError::config(format!(
320            "CRDT field metadata for {}.{} conflicts with encrypted field {}; use encrypted-update-log CRDT fields instead of field-level encryption",
321            metadata.name, field.field, encrypted_field.field
322        )));
323    }
324    Ok(())
325}
326
327fn metadata_column<'a>(
328    metadata: &'a AppTableMetadata,
329    column: &str,
330    role: &str,
331) -> Result<&'a ColumnMetadata> {
332    if column.trim().is_empty() {
333        return Err(SyncularError::config(format!(
334            "CRDT field metadata for {} has an empty {role}",
335            metadata.name
336        )));
337    }
338    metadata
339        .columns
340        .iter()
341        .find(|candidate| candidate.name == column)
342        .ok_or_else(|| {
343            SyncularError::config(format!(
344                "CRDT field metadata for {} references unknown {role} {}",
345                metadata.name, column
346            ))
347        })
348}
349
350fn sync_mode_from_metadata(field: &CrdtYjsFieldMetadata) -> Result<CrdtFieldSyncMode> {
351    match field.sync_mode {
352        "" | "server-merge" => Ok(CrdtFieldSyncMode::ServerMerge),
353        "encrypted-update-log" if is_encrypted_update_log_field(field) => {
354            Ok(CrdtFieldSyncMode::EncryptedUpdateLog)
355        }
356        other => Err(SyncularError::config(format!(
357            "unsupported CRDT field sync mode for {}: {other}",
358            field.field
359        ))),
360    }
361}