1use crate::app_schema::{AppSchema, AppTableMetadata, ColumnMetadata, CrdtYjsFieldMetadata};
2use crate::crdt_yjs::{YjsFieldKind, YjsFieldRule};
3use crate::encrypted_crdt::is_encrypted_update_log_field;
4use crate::error::{Result, SyncularError};
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "camelCase")]
9pub struct CrdtFieldId {
10 pub table: String,
11 pub row_id: String,
12 pub field: String,
13}
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "kebab-case")]
17pub enum CrdtFieldSyncMode {
18 ServerMerge,
19 EncryptedUpdateLog,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "kebab-case")]
24pub enum CrdtUpdateOrigin {
25 Local,
26 Remote,
27 Compaction,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "kebab-case")]
32pub enum CrdtUpdateStatus {
33 Pending,
34 Flushed,
35 Acked,
36 Pruned,
37}
38
39#[derive(Debug, Clone)]
40pub struct CrdtField {
41 id: CrdtFieldId,
42 metadata: &'static AppTableMetadata,
43 field: &'static CrdtYjsFieldMetadata,
44 sync_mode: CrdtFieldSyncMode,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "camelCase")]
49pub struct CrdtDocumentSnapshot {
50 pub document_key: String,
51 pub table: String,
52 pub row_id: String,
53 pub field: String,
54 pub state_column: String,
55 pub sync_mode: CrdtFieldSyncMode,
56 pub state_base64: Option<String>,
57 pub state_vector_base64: String,
58 pub pending_updates: i64,
59 pub flushed_updates: i64,
60 pub acked_updates: i64,
61 pub log_updates: i64,
62 pub updated_at: i64,
63 pub compacted_at: Option<i64>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
67#[serde(rename_all = "camelCase")]
68pub struct CrdtFieldCompactionStats {
69 pub pending_updates: i64,
70 pub flushed_updates: i64,
71 pub acked_updates: i64,
72 pub log_updates: i64,
73 pub state_vector_base64: String,
74 pub updated_at: i64,
75 pub compacted_at: Option<i64>,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "camelCase")]
80pub struct CrdtUpdateLogEntry {
81 pub id: i64,
82 pub document_key: String,
83 pub update_id: String,
84 pub client_commit_id: Option<String>,
85 pub origin: CrdtUpdateOrigin,
86 pub status: CrdtUpdateStatus,
87 pub update_base64: String,
88 pub state_vector_base64: String,
89 pub created_at: i64,
90 pub flushed_at: Option<i64>,
91 pub acked_at: Option<i64>,
92}
93
94impl From<&CrdtDocumentSnapshot> for CrdtFieldCompactionStats {
95 fn from(snapshot: &CrdtDocumentSnapshot) -> Self {
96 Self {
97 pending_updates: snapshot.pending_updates,
98 flushed_updates: snapshot.flushed_updates,
99 acked_updates: snapshot.acked_updates,
100 log_updates: snapshot.log_updates,
101 state_vector_base64: snapshot.state_vector_base64.clone(),
102 updated_at: snapshot.updated_at,
103 compacted_at: snapshot.compacted_at,
104 }
105 }
106}
107
108impl CrdtFieldId {
109 pub fn new(
110 table: impl Into<String>,
111 row_id: impl Into<String>,
112 field: impl Into<String>,
113 ) -> Self {
114 Self {
115 table: table.into(),
116 row_id: row_id.into(),
117 field: field.into(),
118 }
119 }
120}
121
122impl CrdtField {
123 pub fn id(&self) -> &CrdtFieldId {
124 &self.id
125 }
126
127 pub fn table(&self) -> &'static str {
128 self.metadata.name
129 }
130
131 pub fn row_id(&self) -> &str {
132 &self.id.row_id
133 }
134
135 pub fn field(&self) -> &'static str {
136 self.field.field
137 }
138
139 pub fn state_column(&self) -> &'static str {
140 self.field.state_column
141 }
142
143 pub fn container_key(&self) -> &'static str {
144 self.field.container_key
145 }
146
147 pub fn row_id_field(&self) -> &'static str {
148 self.field.row_id_field
149 }
150
151 pub fn sync_mode(&self) -> CrdtFieldSyncMode {
152 self.sync_mode
153 }
154
155 pub fn metadata(&self) -> &'static AppTableMetadata {
156 self.metadata
157 }
158
159 pub fn field_metadata(&self) -> &'static CrdtYjsFieldMetadata {
160 self.field
161 }
162
163 pub fn yjs_rule(&self) -> Result<YjsFieldRule> {
164 Ok(YjsFieldRule {
165 table: self.metadata.name.to_string(),
166 field: self.field.field.to_string(),
167 state_column: self.field.state_column.to_string(),
168 container_key: Some(self.field.container_key.to_string()),
169 row_id_field: Some(self.field.row_id_field.to_string()),
170 kind: YjsFieldKind::from_metadata(self.field.kind)?,
171 })
172 }
173
174 pub fn document_key(&self) -> String {
175 crdt_document_key(self.table(), self.row_id(), self.field())
176 }
177}
178
179pub fn crdt_document_key(table: &str, row_id: &str, field: &str) -> String {
180 format!("{table}\u{1f}{row_id}\u{1f}{field}")
181}
182
183pub fn validate_crdt_field(app_schema: AppSchema, id: &CrdtFieldId) -> Result<CrdtField> {
184 let metadata = app_schema
185 .table_metadata(&id.table)
186 .ok_or_else(|| SyncularError::config(format!("unknown app table: {}", id.table)))?;
187 let field = metadata
188 .crdt_yjs_fields
189 .iter()
190 .find(|field| field.field == id.field)
191 .ok_or_else(|| {
192 SyncularError::config(format!(
193 "no CRDT Yjs field metadata for {}.{}",
194 id.table, id.field
195 ))
196 })?;
197 validate_crdt_field_metadata(metadata, field)?;
198 Ok(CrdtField {
199 id: id.clone(),
200 metadata,
201 field,
202 sync_mode: sync_mode_from_metadata(field)?,
203 })
204}
205
206fn validate_crdt_field_metadata(
207 metadata: &AppTableMetadata,
208 field: &CrdtYjsFieldMetadata,
209) -> Result<()> {
210 if metadata.name.trim().is_empty() {
211 return Err(SyncularError::config(
212 "CRDT field metadata cannot reference an empty table name",
213 ));
214 }
215 let primary_key = metadata_column(metadata, metadata.primary_key_column, "primaryKeyColumn")?;
216 if !primary_key.primary_key {
217 return Err(SyncularError::config(format!(
218 "CRDT field metadata for {} expects primary key column {} to be marked primary",
219 metadata.name, metadata.primary_key_column
220 )));
221 }
222 metadata_column(
223 metadata,
224 metadata.server_version_column,
225 "serverVersionColumn",
226 )?;
227 if field.field.trim().is_empty() {
228 return Err(SyncularError::config(format!(
229 "CRDT field metadata for {} has an empty field name",
230 metadata.name
231 )));
232 }
233 if field.state_column.trim().is_empty() {
234 return Err(SyncularError::config(format!(
235 "CRDT field metadata for {}.{} has an empty state column",
236 metadata.name, field.field
237 )));
238 }
239 if field.container_key.trim().is_empty() {
240 return Err(SyncularError::config(format!(
241 "CRDT field metadata for {}.{} has an empty container key",
242 metadata.name, field.field
243 )));
244 }
245 if field.row_id_field.trim().is_empty() {
246 return Err(SyncularError::config(format!(
247 "CRDT field metadata for {}.{} has an empty row id field",
248 metadata.name, field.field
249 )));
250 }
251 YjsFieldKind::from_metadata(field.kind)?;
252 sync_mode_from_metadata(field)?;
253 let value_column = metadata_column(metadata, field.field, "CRDT field")?;
254 if value_column.type_family != "text" {
255 return Err(SyncularError::config(format!(
256 "CRDT field metadata for {}.{} must use a text column, got {}",
257 metadata.name, field.field, value_column.type_family
258 )));
259 }
260 if value_column.primary_key {
261 return Err(SyncularError::config(format!(
262 "CRDT field metadata for {}.{} cannot use the primary key column",
263 metadata.name, field.field
264 )));
265 }
266 if field.field == metadata.server_version_column {
267 return Err(SyncularError::config(format!(
268 "CRDT field metadata for {}.{} cannot use the server version column",
269 metadata.name, field.field
270 )));
271 }
272 if metadata
273 .soft_delete_column
274 .is_some_and(|soft_delete_column| field.field == soft_delete_column)
275 {
276 return Err(SyncularError::config(format!(
277 "CRDT field metadata for {}.{} cannot use the soft delete column",
278 metadata.name, field.field
279 )));
280 }
281 let state_column = metadata_column(metadata, field.state_column, "CRDT stateColumn")?;
282 if state_column.type_family != "text" {
283 return Err(SyncularError::config(format!(
284 "CRDT field metadata for {}.{} state column {} must use a text column, got {}",
285 metadata.name, field.field, field.state_column, state_column.type_family
286 )));
287 }
288 if field.state_column == field.field {
289 return Err(SyncularError::config(format!(
290 "CRDT field metadata for {}.{} cannot use the same field and state column",
291 metadata.name, field.field
292 )));
293 }
294 if field.state_column == metadata.server_version_column {
295 return Err(SyncularError::config(format!(
296 "CRDT field metadata for {}.{} cannot use the server version column as state column",
297 metadata.name, field.field
298 )));
299 }
300 if field.row_id_field != metadata.primary_key_column {
301 return Err(SyncularError::config(format!(
302 "CRDT field metadata for {}.{} uses row id field {}, expected primary key {}",
303 metadata.name, field.field, field.row_id_field, metadata.primary_key_column
304 )));
305 }
306 metadata_column(metadata, field.row_id_field, "CRDT rowIdField")?;
307 for scope in metadata.scopes {
308 if scope.name.trim().is_empty() {
309 return Err(SyncularError::config(format!(
310 "CRDT field metadata for {}.{} has an empty scope name",
311 metadata.name, field.field
312 )));
313 }
314 metadata_column(metadata, scope.column, "scope column")?;
315 }
316 if let Some(encrypted_field) = metadata.encrypted_fields.iter().find(|encrypted_field| {
317 encrypted_field.field == field.field || encrypted_field.field == field.state_column
318 }) {
319 return Err(SyncularError::config(format!(
320 "CRDT field metadata for {}.{} conflicts with encrypted field {}; use encrypted-update-log CRDT fields instead of field-level encryption",
321 metadata.name, field.field, encrypted_field.field
322 )));
323 }
324 Ok(())
325}
326
327fn metadata_column<'a>(
328 metadata: &'a AppTableMetadata,
329 column: &str,
330 role: &str,
331) -> Result<&'a ColumnMetadata> {
332 if column.trim().is_empty() {
333 return Err(SyncularError::config(format!(
334 "CRDT field metadata for {} has an empty {role}",
335 metadata.name
336 )));
337 }
338 metadata
339 .columns
340 .iter()
341 .find(|candidate| candidate.name == column)
342 .ok_or_else(|| {
343 SyncularError::config(format!(
344 "CRDT field metadata for {} references unknown {role} {}",
345 metadata.name, column
346 ))
347 })
348}
349
350fn sync_mode_from_metadata(field: &CrdtYjsFieldMetadata) -> Result<CrdtFieldSyncMode> {
351 match field.sync_mode {
352 "" | "server-merge" => Ok(CrdtFieldSyncMode::ServerMerge),
353 "encrypted-update-log" if is_encrypted_update_log_field(field) => {
354 Ok(CrdtFieldSyncMode::EncryptedUpdateLog)
355 }
356 other => Err(SyncularError::config(format!(
357 "unsupported CRDT field sync mode for {}: {other}",
358 field.field
359 ))),
360 }
361}