1use crate::app_schema::{AppTableMetadata, CrdtYjsFieldMetadata};
2use crate::crdt_yjs::{
3 apply_yjs_updates_to_state, build_yjs_text_update, materialize_yjs_state,
4 validate_yjs_state_base64_size, validate_yjs_text_input_size,
5 validate_yjs_update_envelope_size, yjs_state_vector_base64, ApplyYjsUpdatesToStateArgs,
6 BuildYjsTextUpdateArgs, YjsFieldKind, YjsFieldRule, YjsUpdateEnvelope,
7};
8use crate::encryption::{
9 random_bytes, validate_32_bytes, xchacha_decrypt, xchacha_encrypt, FieldEncryptionContext,
10 FieldEncryptionKeyProvider, FieldEncryptionTarget, StaticFieldEncryptionKeys,
11};
12use crate::error::{Result, SyncularError};
13use crate::protocol::{PendingSyncularMutation, PullResponse, SyncChange, SyncularMutationKind};
14use base64::engine::general_purpose::URL_SAFE_NO_PAD;
15use base64::Engine as _;
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Map, Value};
18use std::collections::BTreeMap;
19use std::sync::Arc;
20use uuid::Uuid;
21
22pub const CRDT_UPDATES_TABLE: &str = "sync_crdt_updates";
23pub const CRDT_CHECKPOINTS_TABLE: &str = "sync_crdt_checkpoints";
24const CRDT_CIPHERTEXT_PREFIX: &str = "dgsync:ecrdt:1:";
25
26pub fn is_encrypted_crdt_system_table(table: &str) -> bool {
27 matches!(table, CRDT_UPDATES_TABLE | CRDT_CHECKPOINTS_TABLE)
28}
29
30pub fn encrypted_crdt_identity_column(table: &str) -> Result<&'static str> {
31 match table {
32 CRDT_UPDATES_TABLE => Ok("update_id"),
33 CRDT_CHECKPOINTS_TABLE => Ok("checkpoint_id"),
34 _ => Err(SyncularError::config(format!(
35 "unknown encrypted CRDT system table: {table}"
36 ))),
37 }
38}
39
40pub fn encrypted_crdt_normalize_row(
41 table: &str,
42 row_id: &str,
43 row: Option<&Value>,
44) -> Result<Map<String, Value>> {
45 let mut obj = match row {
46 Some(Value::Object(obj)) => obj.clone(),
47 Some(other) => {
48 return Err(SyncularError::protocol_message(format!(
49 "encrypted CRDT row for {table} is not an object: {other}"
50 )));
51 }
52 None => Map::new(),
53 };
54 let identity_column = encrypted_crdt_identity_column(table)?;
55 obj.entry(identity_column.to_string())
56 .or_insert_with(|| Value::String(row_id.to_string()));
57 required_string(&obj, "stream_id")?;
58 required_string(&obj, "app_table")?;
59 required_string(&obj, "row_id")?;
60 required_string(&obj, "field_name")?;
61 required_string(&obj, identity_column)?;
62 required_string(&obj, "key_id")?;
63 required_string(&obj, "ciphertext")?;
64 if table == CRDT_CHECKPOINTS_TABLE {
65 let covers_seq = obj
66 .get("covers_seq")
67 .and_then(Value::as_i64)
68 .ok_or_else(|| {
69 SyncularError::protocol_message(
70 "encrypted CRDT checkpoint covers_seq must be an integer",
71 )
72 })?;
73 if covers_seq < 0 {
74 return Err(SyncularError::protocol_message(
75 "encrypted CRDT checkpoint covers_seq must be non-negative",
76 ));
77 }
78 }
79 Ok(obj)
80}
81
82pub fn encrypted_crdt_scopes_json(row: &Map<String, Value>) -> Result<String> {
83 let scopes = row
84 .get("scopes")
85 .cloned()
86 .unwrap_or_else(|| Value::Object(Map::new()));
87 if !scopes.is_object() {
88 return Err(SyncularError::protocol_message(
89 "encrypted CRDT scopes must be an object",
90 ));
91 }
92 Ok(serde_json::to_string(&scopes)?)
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct StaticEncryptedCrdtConfig {
98 pub keys: BTreeMap<String, String>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 pub encryption_kid: Option<String>,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub partition_id: Option<String>,
103}
104
105#[derive(Clone)]
106pub struct EncryptedCrdt {
107 keys: Arc<dyn FieldEncryptionKeyProvider>,
108 partition_id: String,
109}
110
111#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(rename_all = "camelCase")]
113pub struct EncryptedCrdtStreamStats {
114 pub update_count: i64,
115 pub checkpoint_count: i64,
116 pub checkpointable_update_count: i64,
117 pub max_server_seq: Option<i64>,
118 pub latest_checkpoint_covers_seq: Option<i64>,
119}
120
121#[derive(Debug, Clone)]
122pub struct BuildEncryptedCrdtTextUpdateArgs<'a> {
123 pub ctx: FieldEncryptionContext,
124 pub metadata: &'static AppTableMetadata,
125 pub field: &'a str,
126 pub row_id: &'a str,
127 pub existing_row: &'a Value,
128 pub next_text: &'a str,
129}
130
131#[derive(Debug, Clone)]
132pub struct BuildEncryptedCrdtYjsUpdateArgs<'a> {
133 pub ctx: FieldEncryptionContext,
134 pub metadata: &'static AppTableMetadata,
135 pub field: &'a str,
136 pub row_id: &'a str,
137 pub existing_row: &'a Value,
138 pub update: YjsUpdateEnvelope,
139}
140
141#[derive(Debug, Clone)]
142pub struct BuildEncryptedCrdtCheckpointArgs<'a> {
143 pub ctx: FieldEncryptionContext,
144 pub metadata: &'static AppTableMetadata,
145 pub field: &'a str,
146 pub row_id: &'a str,
147 pub existing_row: &'a Value,
148 pub covers_seq: i64,
149}
150
151impl EncryptedCrdt {
152 pub fn new(keys: Arc<dyn FieldEncryptionKeyProvider>) -> Self {
153 Self {
154 keys,
155 partition_id: "default".to_string(),
156 }
157 }
158
159 pub fn with_partition_id(
160 keys: Arc<dyn FieldEncryptionKeyProvider>,
161 partition_id: impl Into<String>,
162 ) -> Result<Self> {
163 let partition_id = partition_id.into();
164 if partition_id.trim().is_empty() {
165 return Err(SyncularError::config(
166 "encrypted CRDT partition_id cannot be empty",
167 ));
168 }
169 Ok(Self { keys, partition_id })
170 }
171
172 pub fn from_static_config(config: StaticEncryptedCrdtConfig) -> Result<Self> {
173 let keys =
174 StaticFieldEncryptionKeys::from_key_material(config.keys, config.encryption_kid)?;
175 Self::with_partition_id(
176 Arc::new(keys),
177 config.partition_id.unwrap_or_else(|| "default".to_string()),
178 )
179 }
180
181 pub fn from_static_config_json(config_json: &str) -> Result<Option<Self>> {
182 let trimmed = config_json.trim();
183 if trimmed.is_empty() || trimmed == "null" {
184 return Ok(None);
185 }
186 let config: StaticEncryptedCrdtConfig = serde_json::from_str(trimmed)?;
187 Ok(Some(Self::from_static_config(config)?))
188 }
189
190 pub fn partition_id(&self) -> &str {
191 &self.partition_id
192 }
193
194 pub fn transform_pull_response(&self, mut response: PullResponse) -> Result<PullResponse> {
195 for sub in &mut response.subscriptions {
196 if let Some(snapshots) = &mut sub.snapshots {
197 for snapshot in snapshots {
198 if !is_encrypted_crdt_system_table(&snapshot.table) {
199 continue;
200 }
201 for row in &mut snapshot.rows {
202 *row = self.decrypt_system_row_value(&snapshot.table, row.clone())?;
203 }
204 }
205 }
206 for commit in &mut sub.commits {
207 for change in &mut commit.changes {
208 if !is_encrypted_crdt_system_table(&change.table) || change.op != "upsert" {
209 continue;
210 }
211 if let Some(row_json) = change.row_json.take() {
212 change.row_json =
213 Some(self.decrypt_system_row_value(&change.table, row_json)?);
214 }
215 }
216 }
217 }
218 Ok(response)
219 }
220
221 pub fn transform_change(&self, mut change: SyncChange) -> Result<SyncChange> {
222 if is_encrypted_crdt_system_table(&change.table) && change.op == "upsert" {
223 if let Some(row_json) = change.row_json.take() {
224 change.row_json = Some(self.decrypt_system_row_value(&change.table, row_json)?);
225 }
226 }
227 Ok(change)
228 }
229
230 pub fn build_text_update_mutation(
231 &self,
232 args: BuildEncryptedCrdtTextUpdateArgs<'_>,
233 ) -> Result<PendingSyncularMutation> {
234 validate_yjs_text_input_size(args.next_text)?;
235 let field = encrypted_field_metadata(args.metadata, args.field)?;
236 if field.kind != "text" {
237 return Err(SyncularError::config(format!(
238 "encrypted CRDT text updates require a text Yjs field, got {}.{} kind {}",
239 args.metadata.name, field.field, field.kind
240 )));
241 }
242 let existing = args.existing_row.as_object().ok_or_else(|| {
243 SyncularError::protocol_message("encrypted CRDT update existing_row must be an object")
244 })?;
245 let previous_state_base64 = existing
246 .get(field.state_column)
247 .and_then(Value::as_str)
248 .filter(|value| !value.is_empty())
249 .map(str::to_string);
250 let requires_state_vector_base64 = previous_state_base64
251 .as_deref()
252 .map(|state| yjs_state_vector_base64(Some(state)))
253 .transpose()?;
254 let mut update = build_yjs_text_update(BuildYjsTextUpdateArgs {
255 previous_state_base64,
256 next_text: args.next_text.to_string(),
257 container_key: Some(field.container_key.to_string()),
258 update_id: None,
259 })?;
260 update.update.requires_state_vector_base64 = requires_state_vector_base64;
261 self.build_yjs_update_mutation(BuildEncryptedCrdtYjsUpdateArgs {
262 ctx: args.ctx,
263 metadata: args.metadata,
264 field: args.field,
265 row_id: args.row_id,
266 existing_row: args.existing_row,
267 update: update.update,
268 })
269 }
270
271 pub fn build_yjs_update_mutation(
272 &self,
273 args: BuildEncryptedCrdtYjsUpdateArgs<'_>,
274 ) -> Result<PendingSyncularMutation> {
275 validate_yjs_update_envelope_size(&args.update)?;
276 if args.update.update_id.trim().is_empty() {
277 return Err(SyncularError::protocol_message(
278 "encrypted CRDT update.updateId must be a non-empty string",
279 ));
280 }
281 if args.update.update_base64.trim().is_empty() {
282 return Err(SyncularError::protocol_message(
283 "encrypted CRDT update.updateBase64 must be a non-empty base64 string",
284 ));
285 }
286 let field = encrypted_field_metadata(args.metadata, args.field)?;
287 let existing = args.existing_row.as_object().ok_or_else(|| {
288 SyncularError::protocol_message("encrypted CRDT update existing_row must be an object")
289 })?;
290 let scopes = scopes_from_app_row(args.metadata, existing)?;
291 let stream_id = encrypted_crdt_stream_id(args.metadata.name, args.row_id, field.field);
292 let target = FieldEncryptionTarget {
293 scope: args.metadata.name.to_string(),
294 table: args.metadata.name.to_string(),
295 row_id: args.row_id.to_string(),
296 field: field.field.to_string(),
297 };
298 let key_id = self.keys.encryption_kid(&args.ctx, &target)?;
299 let key = self.keys.get_key(&key_id)?;
300 validate_32_bytes("encrypted CRDT key", &key)?;
301 let plaintext = serde_json::to_vec(&EncryptedCrdtPlaintext::YjsUpdateV1 {
302 update_base64: args.update.update_base64.clone(),
303 requires_state_vector_base64: args.update.requires_state_vector_base64.clone(),
304 })?;
305 let aad = encrypted_crdt_aad(
306 CRDT_UPDATES_TABLE,
307 &self.partition_id,
308 &stream_id,
309 args.metadata.name,
310 args.row_id,
311 field.field,
312 &args.update.update_id,
313 );
314 let ciphertext = encrypt_payload(&key, &aad, &plaintext)?;
315 let payload = json!({
316 "partition_id": self.partition_id,
317 "stream_id": stream_id,
318 "app_table": args.metadata.name,
319 "row_id": args.row_id,
320 "field_name": field.field,
321 "update_id": args.update.update_id,
322 "actor_id": args.ctx.actor_id,
323 "client_id": args.ctx.client_id,
324 "key_id": key_id,
325 "ciphertext": ciphertext,
326 "scopes": Value::Object(scopes.clone())
327 });
328 let mut local_row = payload.clone();
329 local_row["update_base64"] = Value::String(args.update.update_base64);
330 if let Some(required) = &args.update.requires_state_vector_base64 {
331 local_row["requires_state_vector_base64"] = Value::String(required.clone());
332 }
333
334 Ok(PendingSyncularMutation {
335 kind: SyncularMutationKind::Upsert,
336 table: CRDT_UPDATES_TABLE.to_string(),
337 row_id: payload["update_id"].as_str().unwrap().to_string(),
338 payload: Some(payload),
339 base_version: None,
340 local_row: Some(local_row),
341 })
342 }
343
344 pub fn build_checkpoint_mutation(
345 &self,
346 args: BuildEncryptedCrdtCheckpointArgs<'_>,
347 ) -> Result<PendingSyncularMutation> {
348 if args.covers_seq < 0 {
349 return Err(SyncularError::config(
350 "encrypted CRDT checkpoint covers_seq must be non-negative",
351 ));
352 }
353 let field = encrypted_field_metadata(args.metadata, args.field)?;
354 let existing = args.existing_row.as_object().ok_or_else(|| {
355 SyncularError::protocol_message(
356 "encrypted CRDT checkpoint existing_row must be an object",
357 )
358 })?;
359 let state_base64 = checkpoint_state_base64(existing, field)?;
360 validate_yjs_state_base64_size(&state_base64)?;
361 let scopes = scopes_from_app_row(args.metadata, existing)?;
362 let stream_id = encrypted_crdt_stream_id(args.metadata.name, args.row_id, field.field);
363 let target = FieldEncryptionTarget {
364 scope: args.metadata.name.to_string(),
365 table: args.metadata.name.to_string(),
366 row_id: args.row_id.to_string(),
367 field: field.field.to_string(),
368 };
369 let key_id = self.keys.encryption_kid(&args.ctx, &target)?;
370 let key = self.keys.get_key(&key_id)?;
371 validate_32_bytes("encrypted CRDT key", &key)?;
372 let checkpoint_id = Uuid::new_v4().to_string();
373 let plaintext = serde_json::to_vec(&EncryptedCrdtPlaintext::YjsStateV1 {
374 state_base64: state_base64.clone(),
375 })?;
376 let aad = encrypted_crdt_aad(
377 CRDT_CHECKPOINTS_TABLE,
378 &self.partition_id,
379 &stream_id,
380 args.metadata.name,
381 args.row_id,
382 field.field,
383 &checkpoint_id,
384 );
385 let ciphertext = encrypt_payload(&key, &aad, &plaintext)?;
386 let payload = json!({
387 "partition_id": self.partition_id,
388 "stream_id": stream_id,
389 "app_table": args.metadata.name,
390 "row_id": args.row_id,
391 "field_name": field.field,
392 "checkpoint_id": checkpoint_id,
393 "covers_seq": args.covers_seq,
394 "actor_id": args.ctx.actor_id,
395 "client_id": args.ctx.client_id,
396 "key_id": key_id,
397 "ciphertext": ciphertext,
398 "scopes": Value::Object(scopes.clone())
399 });
400 let mut local_row = payload.clone();
401 local_row["state_base64"] = Value::String(state_base64);
402
403 Ok(PendingSyncularMutation {
404 kind: SyncularMutationKind::Upsert,
405 table: CRDT_CHECKPOINTS_TABLE.to_string(),
406 row_id: payload["checkpoint_id"].as_str().unwrap().to_string(),
407 payload: Some(payload),
408 base_version: None,
409 local_row: Some(local_row),
410 })
411 }
412
413 fn decrypt_system_row_value(&self, table: &str, value: Value) -> Result<Value> {
414 let Value::Object(mut row) = value else {
415 return Ok(value);
416 };
417 self.decrypt_system_row_in_place(table, &mut row)?;
418 Ok(Value::Object(row))
419 }
420
421 fn decrypt_system_row_in_place(&self, table: &str, row: &mut Map<String, Value>) -> Result<()> {
422 if table == CRDT_UPDATES_TABLE && row.get("update_base64").is_some() {
423 return Ok(());
424 }
425 if table == CRDT_CHECKPOINTS_TABLE && row.get("state_base64").is_some() {
426 return Ok(());
427 }
428
429 let identity_column = encrypted_crdt_identity_column(table)?;
430 let partition_id = row
431 .get("partition_id")
432 .and_then(Value::as_str)
433 .unwrap_or("default");
434 let stream_id = required_string(row, "stream_id")?;
435 let app_table = required_string(row, "app_table")?;
436 let row_id = required_string(row, "row_id")?;
437 let field_name = required_string(row, "field_name")?;
438 let identity = required_string(row, identity_column)?;
439 let key_id = required_string(row, "key_id")?;
440 let ciphertext = required_string(row, "ciphertext")?;
441 let key = self.keys.get_key(&key_id)?;
442 validate_32_bytes("encrypted CRDT key", &key)?;
443 let aad = encrypted_crdt_aad(
444 table,
445 &partition_id,
446 &stream_id,
447 &app_table,
448 &row_id,
449 &field_name,
450 &identity,
451 );
452 let plaintext = decrypt_payload(&key, &aad, &ciphertext)?;
453 match serde_json::from_slice::<EncryptedCrdtPlaintext>(&plaintext)? {
454 EncryptedCrdtPlaintext::YjsUpdateV1 {
455 update_base64,
456 requires_state_vector_base64,
457 } => {
458 if table != CRDT_UPDATES_TABLE {
459 return Err(SyncularError::protocol_message(
460 "encrypted CRDT update plaintext cannot be stored in checkpoint table",
461 ));
462 }
463 row.insert("update_base64".to_string(), Value::String(update_base64));
464 if let Some(required) = requires_state_vector_base64 {
465 row.insert(
466 "requires_state_vector_base64".to_string(),
467 Value::String(required),
468 );
469 }
470 }
471 EncryptedCrdtPlaintext::YjsStateV1 { state_base64 } => {
472 if table != CRDT_CHECKPOINTS_TABLE {
473 return Err(SyncularError::protocol_message(
474 "encrypted CRDT checkpoint plaintext cannot be stored in update table",
475 ));
476 }
477 row.insert("state_base64".to_string(), Value::String(state_base64));
478 }
479 }
480 Ok(())
481 }
482}
483
484pub fn encrypted_crdt_stream_id(table: &str, row_id: &str, field: &str) -> String {
485 format!(
486 "{}:{}:{}",
487 escape_stream_part(table),
488 escape_stream_part(row_id),
489 escape_stream_part(field)
490 )
491}
492
493pub fn is_encrypted_update_log_field(field: &CrdtYjsFieldMetadata) -> bool {
494 field.sync_mode == "encrypted-update-log"
495}
496
497pub fn encrypted_field_metadata(
498 metadata: &AppTableMetadata,
499 field_name: &str,
500) -> Result<&'static CrdtYjsFieldMetadata> {
501 metadata
502 .crdt_yjs_fields
503 .iter()
504 .find(|field| field.field == field_name && is_encrypted_update_log_field(field))
505 .ok_or_else(|| {
506 SyncularError::config(format!(
507 "no encrypted CRDT Yjs field metadata for {}.{field_name}",
508 metadata.name
509 ))
510 })
511}
512
513pub fn encrypted_crdt_plaintext_update_base64(row: &Map<String, Value>) -> Option<String> {
514 row.get("update_base64")
515 .and_then(Value::as_str)
516 .filter(|value| !value.is_empty())
517 .map(str::to_string)
518}
519
520pub fn encrypted_crdt_required_state_vector_base64(row: &Map<String, Value>) -> Option<String> {
521 row.get("requires_state_vector_base64")
522 .and_then(Value::as_str)
523 .filter(|value| !value.is_empty())
524 .map(str::to_string)
525}
526
527pub fn encrypted_crdt_plaintext_state_base64(row: &Map<String, Value>) -> Option<String> {
528 row.get("state_base64")
529 .and_then(Value::as_str)
530 .filter(|value| !value.is_empty())
531 .map(str::to_string)
532}
533
534pub fn apply_encrypted_crdt_plaintext_to_row(
535 metadata: &'static AppTableMetadata,
536 field_name: &str,
537 app_row_id: &str,
538 system_table: &str,
539 system_row: &Map<String, Value>,
540 current_row: Option<Value>,
541) -> Result<Option<Value>> {
542 let field = encrypted_field_metadata(metadata, field_name)?;
543 let mut app_row = current_row
544 .and_then(|row| row.as_object().cloned())
545 .unwrap_or_default();
546 app_row.insert(
547 metadata.primary_key_column.to_string(),
548 Value::String(app_row_id.to_string()),
549 );
550 merge_scope_columns(metadata, system_row, &mut app_row)?;
551
552 let previous_state_base64 = app_row
553 .get(field.state_column)
554 .and_then(Value::as_str)
555 .filter(|value| !value.is_empty())
556 .map(str::to_string);
557 let next_state_base64 = match system_table {
558 CRDT_UPDATES_TABLE => {
559 let Some(update_base64) = encrypted_crdt_plaintext_update_base64(system_row) else {
560 return Ok(None);
561 };
562 apply_yjs_updates_to_state(ApplyYjsUpdatesToStateArgs {
563 previous_state_base64,
564 updates: vec![YjsUpdateEnvelope {
565 update_id: system_row
566 .get("update_id")
567 .and_then(Value::as_str)
568 .unwrap_or("encrypted-crdt-update")
569 .to_string(),
570 update_base64,
571 requires_state_vector_base64: encrypted_crdt_required_state_vector_base64(
572 system_row,
573 ),
574 }],
575 })?
576 .next_state_base64
577 }
578 CRDT_CHECKPOINTS_TABLE => {
579 let Some(state_base64) = encrypted_crdt_plaintext_state_base64(system_row) else {
580 return Ok(None);
581 };
582 state_base64
583 }
584 _ => return Ok(None),
585 };
586 let rule = yjs_rule_from_metadata(metadata.name, field)?;
587 app_row.insert(
588 field.field.to_string(),
589 materialize_yjs_state(&next_state_base64, &rule)?,
590 );
591 app_row.insert(
592 field.state_column.to_string(),
593 Value::String(next_state_base64),
594 );
595 fill_required_app_defaults(metadata, &mut app_row);
596 Ok(Some(Value::Object(app_row)))
597}
598
599pub fn encrypted_crdt_row_matches_scopes(
600 row: &Map<String, Value>,
601 scopes: &Map<String, Value>,
602) -> bool {
603 if scopes.is_empty() {
604 return true;
605 }
606 let Some(stored_scopes) = row.get("scopes").and_then(Value::as_object) else {
607 return false;
608 };
609 scopes.iter().all(|(key, requested)| {
610 let Some(stored) = stored_scopes.get(key) else {
611 return false;
612 };
613 if let Value::Array(values) = requested {
614 return values.iter().any(|value| value == stored);
615 }
616 stored == requested
617 })
618}
619
620fn required_string(row: &Map<String, Value>, field: &str) -> Result<String> {
621 row.get(field)
622 .and_then(Value::as_str)
623 .filter(|value| !value.is_empty())
624 .map(str::to_string)
625 .ok_or_else(|| {
626 SyncularError::protocol_message(format!(
627 "encrypted CRDT payload field {field} must be a non-empty string"
628 ))
629 })
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
633#[serde(tag = "type")]
634enum EncryptedCrdtPlaintext {
635 #[serde(rename = "yjs-update-v1", rename_all = "camelCase")]
636 YjsUpdateV1 {
637 update_base64: String,
638 #[serde(default, skip_serializing_if = "Option::is_none")]
639 requires_state_vector_base64: Option<String>,
640 },
641 #[serde(rename = "yjs-state-v1", rename_all = "camelCase")]
642 YjsStateV1 { state_base64: String },
643}
644
645fn scopes_from_app_row(
646 metadata: &AppTableMetadata,
647 row: &Map<String, Value>,
648) -> Result<Map<String, Value>> {
649 let mut scopes = Map::new();
650 for scope in metadata.scopes {
651 if let Some(value) = row.get(scope.column) {
652 scopes.insert(scope.name.to_string(), value.clone());
653 } else if scope.required {
654 return Err(SyncularError::protocol_message(format!(
655 "cannot build encrypted CRDT update for {} without scope column {}",
656 metadata.name, scope.column
657 )));
658 }
659 }
660 Ok(scopes)
661}
662
663fn merge_scope_columns(
664 metadata: &AppTableMetadata,
665 system_row: &Map<String, Value>,
666 app_row: &mut Map<String, Value>,
667) -> Result<()> {
668 let scopes = system_row
669 .get("scopes")
670 .and_then(Value::as_object)
671 .cloned()
672 .unwrap_or_default();
673 for scope in metadata.scopes {
674 if !app_row.contains_key(scope.column) {
675 if let Some(value) = scopes.get(scope.name) {
676 app_row.insert(scope.column.to_string(), value.clone());
677 }
678 }
679 }
680 Ok(())
681}
682
683fn fill_required_app_defaults(metadata: &AppTableMetadata, row: &mut Map<String, Value>) {
684 for column in metadata.columns {
685 if row.contains_key(column.name) {
686 continue;
687 }
688 let value = match column.type_family {
689 "integer" => Value::Number(0.into()),
690 "real" => json!(0.0),
691 "text" if column.notnull_required => Value::String(String::new()),
692 _ => Value::Null,
693 };
694 row.insert(column.name.to_string(), value);
695 }
696}
697
698fn yjs_rule_from_metadata(table: &str, field: &CrdtYjsFieldMetadata) -> Result<YjsFieldRule> {
699 Ok(YjsFieldRule {
700 table: table.to_string(),
701 field: field.field.to_string(),
702 state_column: field.state_column.to_string(),
703 container_key: Some(field.container_key.to_string()),
704 row_id_field: Some(field.row_id_field.to_string()),
705 kind: match field.kind {
706 "text" => YjsFieldKind::Text,
707 "xml-fragment" => YjsFieldKind::XmlFragment,
708 "prosemirror" => YjsFieldKind::Prosemirror,
709 other => {
710 return Err(SyncularError::config(format!(
711 "unsupported encrypted CRDT Yjs field kind: {other}"
712 )));
713 }
714 },
715 })
716}
717
718fn checkpoint_state_base64(
719 existing: &Map<String, Value>,
720 field: &CrdtYjsFieldMetadata,
721) -> Result<String> {
722 if let Some(state) = existing
723 .get(field.state_column)
724 .and_then(Value::as_str)
725 .filter(|state| !state.is_empty())
726 {
727 return Ok(state.to_string());
728 }
729
730 if field.kind == "text" {
731 let text = existing
732 .get(field.field)
733 .and_then(Value::as_str)
734 .unwrap_or_default();
735 return Ok(build_yjs_text_update(BuildYjsTextUpdateArgs {
736 previous_state_base64: None,
737 next_text: text.to_string(),
738 container_key: Some(field.container_key.to_string()),
739 update_id: Some("checkpoint-state".to_string()),
740 })?
741 .next_state_base64);
742 }
743
744 Err(SyncularError::protocol_message(format!(
745 "cannot build encrypted CRDT checkpoint for {} without {} state",
746 field.field, field.state_column
747 )))
748}
749
750fn encrypted_crdt_aad(
751 table: &str,
752 partition_id: &str,
753 stream_id: &str,
754 app_table: &str,
755 row_id: &str,
756 field_name: &str,
757 identity: &str,
758) -> Vec<u8> {
759 format!(
760 "{table}\u{1f}{partition_id}\u{1f}{stream_id}\u{1f}{app_table}\u{1f}{row_id}\u{1f}{field_name}\u{1f}{identity}"
761 )
762 .into_bytes()
763}
764
765fn encrypt_payload(key: &[u8], aad: &[u8], plaintext: &[u8]) -> Result<String> {
766 let nonce = random_bytes(24)?;
767 let ciphertext = xchacha_encrypt(key, &nonce, aad, plaintext)?;
768 Ok(format!(
769 "{CRDT_CIPHERTEXT_PREFIX}{}:{}",
770 URL_SAFE_NO_PAD.encode(nonce),
771 URL_SAFE_NO_PAD.encode(ciphertext)
772 ))
773}
774
775fn decrypt_payload(key: &[u8], aad: &[u8], encoded: &str) -> Result<Vec<u8>> {
776 let rest = encoded
777 .strip_prefix(CRDT_CIPHERTEXT_PREFIX)
778 .ok_or_else(|| {
779 SyncularError::protocol_message("encrypted CRDT ciphertext has unsupported envelope")
780 })?;
781 let mut parts = rest.split(':');
782 let nonce = parts.next().unwrap_or_default();
783 let ciphertext = parts.next().unwrap_or_default();
784 if parts.next().is_some() || nonce.is_empty() || ciphertext.is_empty() {
785 return Err(SyncularError::protocol_message(
786 "encrypted CRDT ciphertext envelope is malformed",
787 ));
788 }
789 let nonce = URL_SAFE_NO_PAD
790 .decode(nonce)
791 .map_err(|err| SyncularError::protocol_message(format!("decode CRDT nonce: {err}")))?;
792 let ciphertext = URL_SAFE_NO_PAD
793 .decode(ciphertext)
794 .map_err(|err| SyncularError::protocol_message(format!("decode CRDT ciphertext: {err}")))?;
795 xchacha_decrypt(key, &nonce, aad, &ciphertext)
796}
797
798fn escape_stream_part(value: &str) -> String {
799 let mut out = String::new();
800 for byte in value.bytes() {
801 if byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_' | b'.' | b'~') {
802 out.push(byte as char);
803 } else {
804 out.push_str(&format!("%{byte:02X}"));
805 }
806 }
807 out
808}