Skip to main content

syncular_runtime/core/
encrypted_crdt.rs

1use crate::app_schema::{AppTableMetadata, CrdtYjsFieldMetadata};
2use crate::crdt_yjs::{
3    apply_yjs_updates_to_state, build_yjs_text_update, materialize_yjs_state,
4    validate_yjs_state_base64_size, validate_yjs_text_input_size,
5    validate_yjs_update_envelope_size, yjs_state_vector_base64, ApplyYjsUpdatesToStateArgs,
6    BuildYjsTextUpdateArgs, YjsFieldKind, YjsFieldRule, YjsUpdateEnvelope,
7};
8use crate::encryption::{
9    random_bytes, validate_32_bytes, xchacha_decrypt, xchacha_encrypt, FieldEncryptionContext,
10    FieldEncryptionKeyProvider, FieldEncryptionTarget, StaticFieldEncryptionKeys,
11};
12use crate::error::{Result, SyncularError};
13use crate::protocol::{PendingSyncularMutation, PullResponse, SyncChange, SyncularMutationKind};
14use base64::engine::general_purpose::URL_SAFE_NO_PAD;
15use base64::Engine as _;
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Map, Value};
18use std::collections::BTreeMap;
19use std::sync::Arc;
20use uuid::Uuid;
21
22pub const CRDT_UPDATES_TABLE: &str = "sync_crdt_updates";
23pub const CRDT_CHECKPOINTS_TABLE: &str = "sync_crdt_checkpoints";
24const CRDT_CIPHERTEXT_PREFIX: &str = "dgsync:ecrdt:1:";
25
26pub fn is_encrypted_crdt_system_table(table: &str) -> bool {
27    matches!(table, CRDT_UPDATES_TABLE | CRDT_CHECKPOINTS_TABLE)
28}
29
30pub fn encrypted_crdt_identity_column(table: &str) -> Result<&'static str> {
31    match table {
32        CRDT_UPDATES_TABLE => Ok("update_id"),
33        CRDT_CHECKPOINTS_TABLE => Ok("checkpoint_id"),
34        _ => Err(SyncularError::config(format!(
35            "unknown encrypted CRDT system table: {table}"
36        ))),
37    }
38}
39
40pub fn encrypted_crdt_normalize_row(
41    table: &str,
42    row_id: &str,
43    row: Option<&Value>,
44) -> Result<Map<String, Value>> {
45    let mut obj = match row {
46        Some(Value::Object(obj)) => obj.clone(),
47        Some(other) => {
48            return Err(SyncularError::protocol_message(format!(
49                "encrypted CRDT row for {table} is not an object: {other}"
50            )));
51        }
52        None => Map::new(),
53    };
54    let identity_column = encrypted_crdt_identity_column(table)?;
55    obj.entry(identity_column.to_string())
56        .or_insert_with(|| Value::String(row_id.to_string()));
57    required_string(&obj, "stream_id")?;
58    required_string(&obj, "app_table")?;
59    required_string(&obj, "row_id")?;
60    required_string(&obj, "field_name")?;
61    required_string(&obj, identity_column)?;
62    required_string(&obj, "key_id")?;
63    required_string(&obj, "ciphertext")?;
64    if table == CRDT_CHECKPOINTS_TABLE {
65        let covers_seq = obj
66            .get("covers_seq")
67            .and_then(Value::as_i64)
68            .ok_or_else(|| {
69                SyncularError::protocol_message(
70                    "encrypted CRDT checkpoint covers_seq must be an integer",
71                )
72            })?;
73        if covers_seq < 0 {
74            return Err(SyncularError::protocol_message(
75                "encrypted CRDT checkpoint covers_seq must be non-negative",
76            ));
77        }
78    }
79    Ok(obj)
80}
81
82pub fn encrypted_crdt_scopes_json(row: &Map<String, Value>) -> Result<String> {
83    let scopes = row
84        .get("scopes")
85        .cloned()
86        .unwrap_or_else(|| Value::Object(Map::new()));
87    if !scopes.is_object() {
88        return Err(SyncularError::protocol_message(
89            "encrypted CRDT scopes must be an object",
90        ));
91    }
92    Ok(serde_json::to_string(&scopes)?)
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct StaticEncryptedCrdtConfig {
98    pub keys: BTreeMap<String, String>,
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub encryption_kid: Option<String>,
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub partition_id: Option<String>,
103}
104
105#[derive(Clone)]
106pub struct EncryptedCrdt {
107    keys: Arc<dyn FieldEncryptionKeyProvider>,
108    partition_id: String,
109}
110
111#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(rename_all = "camelCase")]
113pub struct EncryptedCrdtStreamStats {
114    pub update_count: i64,
115    pub checkpoint_count: i64,
116    pub checkpointable_update_count: i64,
117    pub max_server_seq: Option<i64>,
118    pub latest_checkpoint_covers_seq: Option<i64>,
119}
120
121#[derive(Debug, Clone)]
122pub struct BuildEncryptedCrdtTextUpdateArgs<'a> {
123    pub ctx: FieldEncryptionContext,
124    pub metadata: &'static AppTableMetadata,
125    pub field: &'a str,
126    pub row_id: &'a str,
127    pub existing_row: &'a Value,
128    pub next_text: &'a str,
129}
130
131#[derive(Debug, Clone)]
132pub struct BuildEncryptedCrdtYjsUpdateArgs<'a> {
133    pub ctx: FieldEncryptionContext,
134    pub metadata: &'static AppTableMetadata,
135    pub field: &'a str,
136    pub row_id: &'a str,
137    pub existing_row: &'a Value,
138    pub update: YjsUpdateEnvelope,
139}
140
141#[derive(Debug, Clone)]
142pub struct BuildEncryptedCrdtCheckpointArgs<'a> {
143    pub ctx: FieldEncryptionContext,
144    pub metadata: &'static AppTableMetadata,
145    pub field: &'a str,
146    pub row_id: &'a str,
147    pub existing_row: &'a Value,
148    pub covers_seq: i64,
149}
150
151impl EncryptedCrdt {
152    pub fn new(keys: Arc<dyn FieldEncryptionKeyProvider>) -> Self {
153        Self {
154            keys,
155            partition_id: "default".to_string(),
156        }
157    }
158
159    pub fn with_partition_id(
160        keys: Arc<dyn FieldEncryptionKeyProvider>,
161        partition_id: impl Into<String>,
162    ) -> Result<Self> {
163        let partition_id = partition_id.into();
164        if partition_id.trim().is_empty() {
165            return Err(SyncularError::config(
166                "encrypted CRDT partition_id cannot be empty",
167            ));
168        }
169        Ok(Self { keys, partition_id })
170    }
171
172    pub fn from_static_config(config: StaticEncryptedCrdtConfig) -> Result<Self> {
173        let keys =
174            StaticFieldEncryptionKeys::from_key_material(config.keys, config.encryption_kid)?;
175        Self::with_partition_id(
176            Arc::new(keys),
177            config.partition_id.unwrap_or_else(|| "default".to_string()),
178        )
179    }
180
181    pub fn from_static_config_json(config_json: &str) -> Result<Option<Self>> {
182        let trimmed = config_json.trim();
183        if trimmed.is_empty() || trimmed == "null" {
184            return Ok(None);
185        }
186        let config: StaticEncryptedCrdtConfig = serde_json::from_str(trimmed)?;
187        Ok(Some(Self::from_static_config(config)?))
188    }
189
190    pub fn partition_id(&self) -> &str {
191        &self.partition_id
192    }
193
194    pub fn transform_pull_response(&self, mut response: PullResponse) -> Result<PullResponse> {
195        for sub in &mut response.subscriptions {
196            if let Some(snapshots) = &mut sub.snapshots {
197                for snapshot in snapshots {
198                    if !is_encrypted_crdt_system_table(&snapshot.table) {
199                        continue;
200                    }
201                    for row in &mut snapshot.rows {
202                        *row = self.decrypt_system_row_value(&snapshot.table, row.clone())?;
203                    }
204                }
205            }
206            for commit in &mut sub.commits {
207                for change in &mut commit.changes {
208                    if !is_encrypted_crdt_system_table(&change.table) || change.op != "upsert" {
209                        continue;
210                    }
211                    if let Some(row_json) = change.row_json.take() {
212                        change.row_json =
213                            Some(self.decrypt_system_row_value(&change.table, row_json)?);
214                    }
215                }
216            }
217        }
218        Ok(response)
219    }
220
221    pub fn transform_change(&self, mut change: SyncChange) -> Result<SyncChange> {
222        if is_encrypted_crdt_system_table(&change.table) && change.op == "upsert" {
223            if let Some(row_json) = change.row_json.take() {
224                change.row_json = Some(self.decrypt_system_row_value(&change.table, row_json)?);
225            }
226        }
227        Ok(change)
228    }
229
230    pub fn build_text_update_mutation(
231        &self,
232        args: BuildEncryptedCrdtTextUpdateArgs<'_>,
233    ) -> Result<PendingSyncularMutation> {
234        validate_yjs_text_input_size(args.next_text)?;
235        let field = encrypted_field_metadata(args.metadata, args.field)?;
236        if field.kind != "text" {
237            return Err(SyncularError::config(format!(
238                "encrypted CRDT text updates require a text Yjs field, got {}.{} kind {}",
239                args.metadata.name, field.field, field.kind
240            )));
241        }
242        let existing = args.existing_row.as_object().ok_or_else(|| {
243            SyncularError::protocol_message("encrypted CRDT update existing_row must be an object")
244        })?;
245        let previous_state_base64 = existing
246            .get(field.state_column)
247            .and_then(Value::as_str)
248            .filter(|value| !value.is_empty())
249            .map(str::to_string);
250        let requires_state_vector_base64 = previous_state_base64
251            .as_deref()
252            .map(|state| yjs_state_vector_base64(Some(state)))
253            .transpose()?;
254        let mut update = build_yjs_text_update(BuildYjsTextUpdateArgs {
255            previous_state_base64,
256            next_text: args.next_text.to_string(),
257            container_key: Some(field.container_key.to_string()),
258            update_id: None,
259        })?;
260        update.update.requires_state_vector_base64 = requires_state_vector_base64;
261        self.build_yjs_update_mutation(BuildEncryptedCrdtYjsUpdateArgs {
262            ctx: args.ctx,
263            metadata: args.metadata,
264            field: args.field,
265            row_id: args.row_id,
266            existing_row: args.existing_row,
267            update: update.update,
268        })
269    }
270
271    pub fn build_yjs_update_mutation(
272        &self,
273        args: BuildEncryptedCrdtYjsUpdateArgs<'_>,
274    ) -> Result<PendingSyncularMutation> {
275        validate_yjs_update_envelope_size(&args.update)?;
276        if args.update.update_id.trim().is_empty() {
277            return Err(SyncularError::protocol_message(
278                "encrypted CRDT update.updateId must be a non-empty string",
279            ));
280        }
281        if args.update.update_base64.trim().is_empty() {
282            return Err(SyncularError::protocol_message(
283                "encrypted CRDT update.updateBase64 must be a non-empty base64 string",
284            ));
285        }
286        let field = encrypted_field_metadata(args.metadata, args.field)?;
287        let existing = args.existing_row.as_object().ok_or_else(|| {
288            SyncularError::protocol_message("encrypted CRDT update existing_row must be an object")
289        })?;
290        let scopes = scopes_from_app_row(args.metadata, existing)?;
291        let stream_id = encrypted_crdt_stream_id(args.metadata.name, args.row_id, field.field);
292        let target = FieldEncryptionTarget {
293            scope: args.metadata.name.to_string(),
294            table: args.metadata.name.to_string(),
295            row_id: args.row_id.to_string(),
296            field: field.field.to_string(),
297        };
298        let key_id = self.keys.encryption_kid(&args.ctx, &target)?;
299        let key = self.keys.get_key(&key_id)?;
300        validate_32_bytes("encrypted CRDT key", &key)?;
301        let plaintext = serde_json::to_vec(&EncryptedCrdtPlaintext::YjsUpdateV1 {
302            update_base64: args.update.update_base64.clone(),
303            requires_state_vector_base64: args.update.requires_state_vector_base64.clone(),
304        })?;
305        let aad = encrypted_crdt_aad(
306            CRDT_UPDATES_TABLE,
307            &self.partition_id,
308            &stream_id,
309            args.metadata.name,
310            args.row_id,
311            field.field,
312            &args.update.update_id,
313        );
314        let ciphertext = encrypt_payload(&key, &aad, &plaintext)?;
315        let payload = json!({
316            "partition_id": self.partition_id,
317            "stream_id": stream_id,
318            "app_table": args.metadata.name,
319            "row_id": args.row_id,
320            "field_name": field.field,
321            "update_id": args.update.update_id,
322            "actor_id": args.ctx.actor_id,
323            "client_id": args.ctx.client_id,
324            "key_id": key_id,
325            "ciphertext": ciphertext,
326            "scopes": Value::Object(scopes.clone())
327        });
328        let mut local_row = payload.clone();
329        local_row["update_base64"] = Value::String(args.update.update_base64);
330        if let Some(required) = &args.update.requires_state_vector_base64 {
331            local_row["requires_state_vector_base64"] = Value::String(required.clone());
332        }
333
334        Ok(PendingSyncularMutation {
335            kind: SyncularMutationKind::Upsert,
336            table: CRDT_UPDATES_TABLE.to_string(),
337            row_id: payload["update_id"].as_str().unwrap().to_string(),
338            payload: Some(payload),
339            base_version: None,
340            local_row: Some(local_row),
341        })
342    }
343
344    pub fn build_checkpoint_mutation(
345        &self,
346        args: BuildEncryptedCrdtCheckpointArgs<'_>,
347    ) -> Result<PendingSyncularMutation> {
348        if args.covers_seq < 0 {
349            return Err(SyncularError::config(
350                "encrypted CRDT checkpoint covers_seq must be non-negative",
351            ));
352        }
353        let field = encrypted_field_metadata(args.metadata, args.field)?;
354        let existing = args.existing_row.as_object().ok_or_else(|| {
355            SyncularError::protocol_message(
356                "encrypted CRDT checkpoint existing_row must be an object",
357            )
358        })?;
359        let state_base64 = checkpoint_state_base64(existing, field)?;
360        validate_yjs_state_base64_size(&state_base64)?;
361        let scopes = scopes_from_app_row(args.metadata, existing)?;
362        let stream_id = encrypted_crdt_stream_id(args.metadata.name, args.row_id, field.field);
363        let target = FieldEncryptionTarget {
364            scope: args.metadata.name.to_string(),
365            table: args.metadata.name.to_string(),
366            row_id: args.row_id.to_string(),
367            field: field.field.to_string(),
368        };
369        let key_id = self.keys.encryption_kid(&args.ctx, &target)?;
370        let key = self.keys.get_key(&key_id)?;
371        validate_32_bytes("encrypted CRDT key", &key)?;
372        let checkpoint_id = Uuid::new_v4().to_string();
373        let plaintext = serde_json::to_vec(&EncryptedCrdtPlaintext::YjsStateV1 {
374            state_base64: state_base64.clone(),
375        })?;
376        let aad = encrypted_crdt_aad(
377            CRDT_CHECKPOINTS_TABLE,
378            &self.partition_id,
379            &stream_id,
380            args.metadata.name,
381            args.row_id,
382            field.field,
383            &checkpoint_id,
384        );
385        let ciphertext = encrypt_payload(&key, &aad, &plaintext)?;
386        let payload = json!({
387            "partition_id": self.partition_id,
388            "stream_id": stream_id,
389            "app_table": args.metadata.name,
390            "row_id": args.row_id,
391            "field_name": field.field,
392            "checkpoint_id": checkpoint_id,
393            "covers_seq": args.covers_seq,
394            "actor_id": args.ctx.actor_id,
395            "client_id": args.ctx.client_id,
396            "key_id": key_id,
397            "ciphertext": ciphertext,
398            "scopes": Value::Object(scopes.clone())
399        });
400        let mut local_row = payload.clone();
401        local_row["state_base64"] = Value::String(state_base64);
402
403        Ok(PendingSyncularMutation {
404            kind: SyncularMutationKind::Upsert,
405            table: CRDT_CHECKPOINTS_TABLE.to_string(),
406            row_id: payload["checkpoint_id"].as_str().unwrap().to_string(),
407            payload: Some(payload),
408            base_version: None,
409            local_row: Some(local_row),
410        })
411    }
412
413    fn decrypt_system_row_value(&self, table: &str, value: Value) -> Result<Value> {
414        let Value::Object(mut row) = value else {
415            return Ok(value);
416        };
417        self.decrypt_system_row_in_place(table, &mut row)?;
418        Ok(Value::Object(row))
419    }
420
421    fn decrypt_system_row_in_place(&self, table: &str, row: &mut Map<String, Value>) -> Result<()> {
422        if table == CRDT_UPDATES_TABLE && row.get("update_base64").is_some() {
423            return Ok(());
424        }
425        if table == CRDT_CHECKPOINTS_TABLE && row.get("state_base64").is_some() {
426            return Ok(());
427        }
428
429        let identity_column = encrypted_crdt_identity_column(table)?;
430        let partition_id = row
431            .get("partition_id")
432            .and_then(Value::as_str)
433            .unwrap_or("default");
434        let stream_id = required_string(row, "stream_id")?;
435        let app_table = required_string(row, "app_table")?;
436        let row_id = required_string(row, "row_id")?;
437        let field_name = required_string(row, "field_name")?;
438        let identity = required_string(row, identity_column)?;
439        let key_id = required_string(row, "key_id")?;
440        let ciphertext = required_string(row, "ciphertext")?;
441        let key = self.keys.get_key(&key_id)?;
442        validate_32_bytes("encrypted CRDT key", &key)?;
443        let aad = encrypted_crdt_aad(
444            table,
445            &partition_id,
446            &stream_id,
447            &app_table,
448            &row_id,
449            &field_name,
450            &identity,
451        );
452        let plaintext = decrypt_payload(&key, &aad, &ciphertext)?;
453        match serde_json::from_slice::<EncryptedCrdtPlaintext>(&plaintext)? {
454            EncryptedCrdtPlaintext::YjsUpdateV1 {
455                update_base64,
456                requires_state_vector_base64,
457            } => {
458                if table != CRDT_UPDATES_TABLE {
459                    return Err(SyncularError::protocol_message(
460                        "encrypted CRDT update plaintext cannot be stored in checkpoint table",
461                    ));
462                }
463                row.insert("update_base64".to_string(), Value::String(update_base64));
464                if let Some(required) = requires_state_vector_base64 {
465                    row.insert(
466                        "requires_state_vector_base64".to_string(),
467                        Value::String(required),
468                    );
469                }
470            }
471            EncryptedCrdtPlaintext::YjsStateV1 { state_base64 } => {
472                if table != CRDT_CHECKPOINTS_TABLE {
473                    return Err(SyncularError::protocol_message(
474                        "encrypted CRDT checkpoint plaintext cannot be stored in update table",
475                    ));
476                }
477                row.insert("state_base64".to_string(), Value::String(state_base64));
478            }
479        }
480        Ok(())
481    }
482}
483
484pub fn encrypted_crdt_stream_id(table: &str, row_id: &str, field: &str) -> String {
485    format!(
486        "{}:{}:{}",
487        escape_stream_part(table),
488        escape_stream_part(row_id),
489        escape_stream_part(field)
490    )
491}
492
493pub fn is_encrypted_update_log_field(field: &CrdtYjsFieldMetadata) -> bool {
494    field.sync_mode == "encrypted-update-log"
495}
496
497pub fn encrypted_field_metadata(
498    metadata: &AppTableMetadata,
499    field_name: &str,
500) -> Result<&'static CrdtYjsFieldMetadata> {
501    metadata
502        .crdt_yjs_fields
503        .iter()
504        .find(|field| field.field == field_name && is_encrypted_update_log_field(field))
505        .ok_or_else(|| {
506            SyncularError::config(format!(
507                "no encrypted CRDT Yjs field metadata for {}.{field_name}",
508                metadata.name
509            ))
510        })
511}
512
513pub fn encrypted_crdt_plaintext_update_base64(row: &Map<String, Value>) -> Option<String> {
514    row.get("update_base64")
515        .and_then(Value::as_str)
516        .filter(|value| !value.is_empty())
517        .map(str::to_string)
518}
519
520pub fn encrypted_crdt_required_state_vector_base64(row: &Map<String, Value>) -> Option<String> {
521    row.get("requires_state_vector_base64")
522        .and_then(Value::as_str)
523        .filter(|value| !value.is_empty())
524        .map(str::to_string)
525}
526
527pub fn encrypted_crdt_plaintext_state_base64(row: &Map<String, Value>) -> Option<String> {
528    row.get("state_base64")
529        .and_then(Value::as_str)
530        .filter(|value| !value.is_empty())
531        .map(str::to_string)
532}
533
534pub fn apply_encrypted_crdt_plaintext_to_row(
535    metadata: &'static AppTableMetadata,
536    field_name: &str,
537    app_row_id: &str,
538    system_table: &str,
539    system_row: &Map<String, Value>,
540    current_row: Option<Value>,
541) -> Result<Option<Value>> {
542    let field = encrypted_field_metadata(metadata, field_name)?;
543    let mut app_row = current_row
544        .and_then(|row| row.as_object().cloned())
545        .unwrap_or_default();
546    app_row.insert(
547        metadata.primary_key_column.to_string(),
548        Value::String(app_row_id.to_string()),
549    );
550    merge_scope_columns(metadata, system_row, &mut app_row)?;
551
552    let previous_state_base64 = app_row
553        .get(field.state_column)
554        .and_then(Value::as_str)
555        .filter(|value| !value.is_empty())
556        .map(str::to_string);
557    let next_state_base64 = match system_table {
558        CRDT_UPDATES_TABLE => {
559            let Some(update_base64) = encrypted_crdt_plaintext_update_base64(system_row) else {
560                return Ok(None);
561            };
562            apply_yjs_updates_to_state(ApplyYjsUpdatesToStateArgs {
563                previous_state_base64,
564                updates: vec![YjsUpdateEnvelope {
565                    update_id: system_row
566                        .get("update_id")
567                        .and_then(Value::as_str)
568                        .unwrap_or("encrypted-crdt-update")
569                        .to_string(),
570                    update_base64,
571                    requires_state_vector_base64: encrypted_crdt_required_state_vector_base64(
572                        system_row,
573                    ),
574                }],
575            })?
576            .next_state_base64
577        }
578        CRDT_CHECKPOINTS_TABLE => {
579            let Some(state_base64) = encrypted_crdt_plaintext_state_base64(system_row) else {
580                return Ok(None);
581            };
582            state_base64
583        }
584        _ => return Ok(None),
585    };
586    let rule = yjs_rule_from_metadata(metadata.name, field)?;
587    app_row.insert(
588        field.field.to_string(),
589        materialize_yjs_state(&next_state_base64, &rule)?,
590    );
591    app_row.insert(
592        field.state_column.to_string(),
593        Value::String(next_state_base64),
594    );
595    fill_required_app_defaults(metadata, &mut app_row);
596    Ok(Some(Value::Object(app_row)))
597}
598
599pub fn encrypted_crdt_row_matches_scopes(
600    row: &Map<String, Value>,
601    scopes: &Map<String, Value>,
602) -> bool {
603    if scopes.is_empty() {
604        return true;
605    }
606    let Some(stored_scopes) = row.get("scopes").and_then(Value::as_object) else {
607        return false;
608    };
609    scopes.iter().all(|(key, requested)| {
610        let Some(stored) = stored_scopes.get(key) else {
611            return false;
612        };
613        if let Value::Array(values) = requested {
614            return values.iter().any(|value| value == stored);
615        }
616        stored == requested
617    })
618}
619
620fn required_string(row: &Map<String, Value>, field: &str) -> Result<String> {
621    row.get(field)
622        .and_then(Value::as_str)
623        .filter(|value| !value.is_empty())
624        .map(str::to_string)
625        .ok_or_else(|| {
626            SyncularError::protocol_message(format!(
627                "encrypted CRDT payload field {field} must be a non-empty string"
628            ))
629        })
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633#[serde(tag = "type")]
634enum EncryptedCrdtPlaintext {
635    #[serde(rename = "yjs-update-v1", rename_all = "camelCase")]
636    YjsUpdateV1 {
637        update_base64: String,
638        #[serde(default, skip_serializing_if = "Option::is_none")]
639        requires_state_vector_base64: Option<String>,
640    },
641    #[serde(rename = "yjs-state-v1", rename_all = "camelCase")]
642    YjsStateV1 { state_base64: String },
643}
644
645fn scopes_from_app_row(
646    metadata: &AppTableMetadata,
647    row: &Map<String, Value>,
648) -> Result<Map<String, Value>> {
649    let mut scopes = Map::new();
650    for scope in metadata.scopes {
651        if let Some(value) = row.get(scope.column) {
652            scopes.insert(scope.name.to_string(), value.clone());
653        } else if scope.required {
654            return Err(SyncularError::protocol_message(format!(
655                "cannot build encrypted CRDT update for {} without scope column {}",
656                metadata.name, scope.column
657            )));
658        }
659    }
660    Ok(scopes)
661}
662
663fn merge_scope_columns(
664    metadata: &AppTableMetadata,
665    system_row: &Map<String, Value>,
666    app_row: &mut Map<String, Value>,
667) -> Result<()> {
668    let scopes = system_row
669        .get("scopes")
670        .and_then(Value::as_object)
671        .cloned()
672        .unwrap_or_default();
673    for scope in metadata.scopes {
674        if !app_row.contains_key(scope.column) {
675            if let Some(value) = scopes.get(scope.name) {
676                app_row.insert(scope.column.to_string(), value.clone());
677            }
678        }
679    }
680    Ok(())
681}
682
683fn fill_required_app_defaults(metadata: &AppTableMetadata, row: &mut Map<String, Value>) {
684    for column in metadata.columns {
685        if row.contains_key(column.name) {
686            continue;
687        }
688        let value = match column.type_family {
689            "integer" => Value::Number(0.into()),
690            "real" => json!(0.0),
691            "text" if column.notnull_required => Value::String(String::new()),
692            _ => Value::Null,
693        };
694        row.insert(column.name.to_string(), value);
695    }
696}
697
698fn yjs_rule_from_metadata(table: &str, field: &CrdtYjsFieldMetadata) -> Result<YjsFieldRule> {
699    Ok(YjsFieldRule {
700        table: table.to_string(),
701        field: field.field.to_string(),
702        state_column: field.state_column.to_string(),
703        container_key: Some(field.container_key.to_string()),
704        row_id_field: Some(field.row_id_field.to_string()),
705        kind: match field.kind {
706            "text" => YjsFieldKind::Text,
707            "xml-fragment" => YjsFieldKind::XmlFragment,
708            "prosemirror" => YjsFieldKind::Prosemirror,
709            other => {
710                return Err(SyncularError::config(format!(
711                    "unsupported encrypted CRDT Yjs field kind: {other}"
712                )));
713            }
714        },
715    })
716}
717
718fn checkpoint_state_base64(
719    existing: &Map<String, Value>,
720    field: &CrdtYjsFieldMetadata,
721) -> Result<String> {
722    if let Some(state) = existing
723        .get(field.state_column)
724        .and_then(Value::as_str)
725        .filter(|state| !state.is_empty())
726    {
727        return Ok(state.to_string());
728    }
729
730    if field.kind == "text" {
731        let text = existing
732            .get(field.field)
733            .and_then(Value::as_str)
734            .unwrap_or_default();
735        return Ok(build_yjs_text_update(BuildYjsTextUpdateArgs {
736            previous_state_base64: None,
737            next_text: text.to_string(),
738            container_key: Some(field.container_key.to_string()),
739            update_id: Some("checkpoint-state".to_string()),
740        })?
741        .next_state_base64);
742    }
743
744    Err(SyncularError::protocol_message(format!(
745        "cannot build encrypted CRDT checkpoint for {} without {} state",
746        field.field, field.state_column
747    )))
748}
749
750fn encrypted_crdt_aad(
751    table: &str,
752    partition_id: &str,
753    stream_id: &str,
754    app_table: &str,
755    row_id: &str,
756    field_name: &str,
757    identity: &str,
758) -> Vec<u8> {
759    format!(
760        "{table}\u{1f}{partition_id}\u{1f}{stream_id}\u{1f}{app_table}\u{1f}{row_id}\u{1f}{field_name}\u{1f}{identity}"
761    )
762    .into_bytes()
763}
764
765fn encrypt_payload(key: &[u8], aad: &[u8], plaintext: &[u8]) -> Result<String> {
766    let nonce = random_bytes(24)?;
767    let ciphertext = xchacha_encrypt(key, &nonce, aad, plaintext)?;
768    Ok(format!(
769        "{CRDT_CIPHERTEXT_PREFIX}{}:{}",
770        URL_SAFE_NO_PAD.encode(nonce),
771        URL_SAFE_NO_PAD.encode(ciphertext)
772    ))
773}
774
775fn decrypt_payload(key: &[u8], aad: &[u8], encoded: &str) -> Result<Vec<u8>> {
776    let rest = encoded
777        .strip_prefix(CRDT_CIPHERTEXT_PREFIX)
778        .ok_or_else(|| {
779            SyncularError::protocol_message("encrypted CRDT ciphertext has unsupported envelope")
780        })?;
781    let mut parts = rest.split(':');
782    let nonce = parts.next().unwrap_or_default();
783    let ciphertext = parts.next().unwrap_or_default();
784    if parts.next().is_some() || nonce.is_empty() || ciphertext.is_empty() {
785        return Err(SyncularError::protocol_message(
786            "encrypted CRDT ciphertext envelope is malformed",
787        ));
788    }
789    let nonce = URL_SAFE_NO_PAD
790        .decode(nonce)
791        .map_err(|err| SyncularError::protocol_message(format!("decode CRDT nonce: {err}")))?;
792    let ciphertext = URL_SAFE_NO_PAD
793        .decode(ciphertext)
794        .map_err(|err| SyncularError::protocol_message(format!("decode CRDT ciphertext: {err}")))?;
795    xchacha_decrypt(key, &nonce, aad, &ciphertext)
796}
797
798fn escape_stream_part(value: &str) -> String {
799    let mut out = String::new();
800    for byte in value.bytes() {
801        if byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_' | b'.' | b'~') {
802            out.push(byte as char);
803        } else {
804            out.push_str(&format!("%{byte:02X}"));
805        }
806    }
807    out
808}