1use crate::app_schema::{AppTableMetadata, CrdtYjsFieldMetadata};
2#[cfg(feature = "crdt-yjs")]
3use crate::error::FULL_SNAPSHOT_RESYNC_REQUIRED;
4use crate::error::{Result, SyncularError};
5use crate::limits::{
6 MAX_CRDT_REQUEST_JSON_BYTES, MAX_CRDT_STATE_BASE64_BYTES, MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
7 MAX_CRDT_TEXT_BYTES, MAX_CRDT_UPDATE_BASE64_BYTES,
8};
9#[cfg(feature = "crdt-yjs")]
10use crate::protocol::random_syncular_id;
11use crate::protocol::{validate_payload_bytes, SyncOperation};
12#[cfg(feature = "crdt-yjs")]
13use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
14use serde::{Deserialize, Serialize};
15use serde_json::{Map, Value};
16use std::collections::{BTreeMap, BTreeSet};
17#[cfg(feature = "crdt-yjs")]
18use yrs::updates::decoder::Decode;
19#[cfg(feature = "crdt-yjs")]
20use yrs::updates::encoder::Encode;
21#[cfg(feature = "crdt-yjs")]
22use yrs::{Doc, GetString, ReadTxn, StateVector, Text, Transact, Update};
23
24pub const YJS_PAYLOAD_KEY: &str = "__yjs";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "kebab-case")]
28pub enum YjsFieldKind {
29 Text,
30 XmlFragment,
31 Prosemirror,
32}
33
34impl Default for YjsFieldKind {
35 fn default() -> Self {
36 Self::Text
37 }
38}
39
40impl YjsFieldKind {
41 pub fn from_metadata(value: &str) -> Result<Self> {
42 match value {
43 "text" => Ok(Self::Text),
44 "xml-fragment" => Ok(Self::XmlFragment),
45 "prosemirror" => Ok(Self::Prosemirror),
46 other => Err(SyncularError::config(format!(
47 "unsupported Yjs field kind: {other}"
48 ))),
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct YjsFieldRule {
56 pub table: String,
57 pub field: String,
58 pub state_column: String,
59 #[serde(default)]
60 pub container_key: Option<String>,
61 #[serde(default)]
62 pub row_id_field: Option<String>,
63 #[serde(default)]
64 pub kind: YjsFieldKind,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct YjsUpdateEnvelope {
70 pub update_id: String,
71 pub update_base64: String,
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub requires_state_vector_base64: Option<String>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct BuildYjsTextUpdateArgs {
79 #[serde(default)]
80 pub previous_state_base64: Option<String>,
81 pub next_text: String,
82 #[serde(default)]
83 pub container_key: Option<String>,
84 #[serde(default)]
85 pub update_id: Option<String>,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90pub struct BuildYjsTextUpdateResult {
91 pub update: YjsUpdateEnvelope,
92 pub next_state_base64: String,
93 pub next_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[serde(rename_all = "camelCase")]
98pub struct ApplyYjsTextUpdatesArgs {
99 #[serde(default)]
100 pub previous_state_base64: Option<String>,
101 pub updates: Vec<YjsUpdateEnvelope>,
102 #[serde(default)]
103 pub container_key: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub struct ApplyYjsTextUpdatesResult {
109 pub next_state_base64: String,
110 pub text: String,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct ApplyYjsUpdatesToStateArgs {
116 #[serde(default)]
117 pub previous_state_base64: Option<String>,
118 pub updates: Vec<YjsUpdateEnvelope>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "camelCase")]
123pub struct ApplyYjsUpdatesToStateResult {
124 pub next_state_base64: String,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ApplyYjsEnvelopeToPayloadArgs {
130 pub table: String,
131 #[serde(default)]
132 pub row_id: Option<String>,
133 pub payload: Value,
134 #[serde(default)]
135 pub existing_row: Option<Value>,
136 pub rules: Vec<YjsFieldRule>,
137 #[serde(default)]
138 pub envelope_key: Option<String>,
139 #[serde(default)]
140 pub strict: Option<bool>,
141 #[serde(default)]
142 pub strip_envelope: Option<bool>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "camelCase")]
147pub struct ApplyYjsEnvelopeToPayloadResult {
148 pub payload: Value,
149}
150
151pub fn validate_crdt_request_json_size(request_json: &str) -> Result<()> {
152 validate_payload_bytes(
153 "maxCrdtRequestJsonBytes",
154 request_json.len(),
155 MAX_CRDT_REQUEST_JSON_BYTES,
156 "Syncular CRDT request JSON exceeds the configured limit",
157 )
158}
159
160pub fn validate_yjs_update_envelope_size(update: &YjsUpdateEnvelope) -> Result<()> {
161 validate_payload_bytes(
162 "maxCrdtUpdateBase64Bytes",
163 update.update_base64.len(),
164 MAX_CRDT_UPDATE_BASE64_BYTES,
165 "Syncular CRDT updateBase64 exceeds the configured limit",
166 )?;
167 if let Some(required) = &update.requires_state_vector_base64 {
168 validate_payload_bytes(
169 "maxCrdtStateVectorBase64Bytes",
170 required.len(),
171 MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
172 "Syncular CRDT requiresStateVectorBase64 exceeds the configured limit",
173 )?;
174 }
175 Ok(())
176}
177
178pub fn validate_yjs_update_envelope_list_size(updates: &[YjsUpdateEnvelope]) -> Result<()> {
179 for update in updates {
180 validate_yjs_update_envelope_size(update)?;
181 }
182 Ok(())
183}
184
185pub fn validate_yjs_state_base64_size(state_base64: &str) -> Result<()> {
186 validate_payload_bytes(
187 "maxCrdtStateBase64Bytes",
188 state_base64.len(),
189 MAX_CRDT_STATE_BASE64_BYTES,
190 "Syncular CRDT stateBase64 exceeds the configured limit",
191 )
192}
193
194pub fn validate_yjs_text_input_size(next_text: &str) -> Result<()> {
195 validate_payload_bytes(
196 "maxCrdtTextBytes",
197 next_text.len(),
198 MAX_CRDT_TEXT_BYTES,
199 "Syncular CRDT text exceeds the configured limit",
200 )
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
204#[serde(rename_all = "camelCase")]
205pub struct MaterializeYjsRowArgs {
206 pub table: String,
207 #[serde(default)]
208 pub row_id: Option<String>,
209 pub row: Value,
210 pub rules: Vec<YjsFieldRule>,
211 #[serde(default)]
212 pub envelope_key: Option<String>,
213 #[serde(default)]
214 pub strip_envelope: Option<bool>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "camelCase")]
219pub struct MaterializeYjsRowResult {
220 pub row: Value,
221}
222
223#[cfg(feature = "crdt-yjs")]
224pub fn build_yjs_text_update(args: BuildYjsTextUpdateArgs) -> Result<BuildYjsTextUpdateResult> {
225 validate_yjs_text_input_size(&args.next_text)?;
226 let container_key = args.container_key.unwrap_or_else(|| "text".to_string());
227 let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
228 let before = {
229 let txn = doc.transact();
230 txn.state_vector()
231 };
232 patch_text(&doc, &container_key, &args.next_text);
233 let text_ref = doc.get_or_insert_text(container_key);
234 let txn = doc.transact();
235 let update = txn.encode_state_as_update_v1(&before);
236 let next_state = txn.encode_state_as_update_v1(&StateVector::default());
237 let next_text = text_ref.get_string(&txn);
238
239 let next_state_base64 = encode_base64(&next_state);
240 validate_yjs_state_base64_size(&next_state_base64)?;
241 Ok(BuildYjsTextUpdateResult {
242 update: YjsUpdateEnvelope {
243 update_id: args.update_id.unwrap_or_else(random_syncular_id),
244 update_base64: encode_base64(&update),
245 requires_state_vector_base64: None,
246 },
247 next_state_base64,
248 next_text,
249 })
250}
251
252#[cfg(not(feature = "crdt-yjs"))]
253pub fn build_yjs_text_update(_args: BuildYjsTextUpdateArgs) -> Result<BuildYjsTextUpdateResult> {
254 Err(crdt_yjs_feature_disabled())
255}
256
257#[cfg(feature = "crdt-yjs")]
258pub fn apply_yjs_text_updates(args: ApplyYjsTextUpdatesArgs) -> Result<ApplyYjsTextUpdatesResult> {
259 let container_key = args.container_key.unwrap_or_else(|| "text".to_string());
260 let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
261 apply_updates(&doc, &args.updates)?;
262 let text_ref = doc.get_or_insert_text(container_key);
263 let txn = doc.transact();
264 let text = text_ref.get_string(&txn);
265 let next_state = txn.encode_state_as_update_v1(&StateVector::default());
266 let next_state_base64 = encode_base64(&next_state);
267 validate_yjs_state_base64_size(&next_state_base64)?;
268 Ok(ApplyYjsTextUpdatesResult {
269 next_state_base64,
270 text,
271 })
272}
273
274#[cfg(not(feature = "crdt-yjs"))]
275pub fn apply_yjs_text_updates(_args: ApplyYjsTextUpdatesArgs) -> Result<ApplyYjsTextUpdatesResult> {
276 Err(crdt_yjs_feature_disabled())
277}
278
279#[cfg(feature = "crdt-yjs")]
280pub fn apply_yjs_updates_to_state(
281 args: ApplyYjsUpdatesToStateArgs,
282) -> Result<ApplyYjsUpdatesToStateResult> {
283 let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
284 apply_updates(&doc, &args.updates)?;
285 let next_state = {
286 let txn = doc.transact();
287 txn.encode_state_as_update_v1(&StateVector::default())
288 };
289 let next_state_base64 = encode_base64(&next_state);
290 validate_yjs_state_base64_size(&next_state_base64)?;
291 Ok(ApplyYjsUpdatesToStateResult { next_state_base64 })
292}
293
294#[cfg(not(feature = "crdt-yjs"))]
295pub fn apply_yjs_updates_to_state(
296 _args: ApplyYjsUpdatesToStateArgs,
297) -> Result<ApplyYjsUpdatesToStateResult> {
298 Err(crdt_yjs_feature_disabled())
299}
300
301#[cfg(feature = "crdt-yjs")]
302pub fn materialize_yjs_state(state_base64: &str, rule: &YjsFieldRule) -> Result<Value> {
303 let doc = create_doc_from_state(Some(state_base64))?;
304 materialize_rule_value(&doc, rule)
305}
306
307#[cfg(not(feature = "crdt-yjs"))]
308pub fn materialize_yjs_state(_state_base64: &str, _rule: &YjsFieldRule) -> Result<Value> {
309 Err(crdt_yjs_feature_disabled())
310}
311
312#[cfg(feature = "crdt-yjs")]
313pub fn yjs_state_vector_base64(state_base64: Option<&str>) -> Result<String> {
314 let doc = create_doc_from_state(state_base64)?;
315 let txn = doc.transact();
316 let state_vector_base64 = encode_base64(&txn.state_vector().encode_v1());
317 validate_payload_bytes(
318 "maxCrdtStateVectorBase64Bytes",
319 state_vector_base64.len(),
320 MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
321 "Syncular CRDT stateVectorBase64 exceeds the configured limit",
322 )?;
323 Ok(state_vector_base64)
324}
325
326#[cfg(not(feature = "crdt-yjs"))]
327pub fn yjs_state_vector_base64(_state_base64: Option<&str>) -> Result<String> {
328 Err(crdt_yjs_feature_disabled())
329}
330
331pub fn apply_yjs_envelope_to_payload(
332 args: ApplyYjsEnvelopeToPayloadArgs,
333) -> Result<ApplyYjsEnvelopeToPayloadResult> {
334 let payload = transform_payload(
335 &args.table,
336 args.row_id.as_deref(),
337 args.payload,
338 args.existing_row.as_ref(),
339 &args.rules,
340 args.envelope_key.as_deref().unwrap_or(YJS_PAYLOAD_KEY),
341 args.strict.unwrap_or(true),
342 args.strip_envelope.unwrap_or(true),
343 )?;
344 Ok(ApplyYjsEnvelopeToPayloadResult { payload })
345}
346
347pub fn materialize_yjs_row(args: MaterializeYjsRowArgs) -> Result<MaterializeYjsRowResult> {
348 let row = materialize_row(
349 &args.table,
350 args.row_id.as_deref(),
351 args.row,
352 &args.rules,
353 args.envelope_key.as_deref().unwrap_or(YJS_PAYLOAD_KEY),
354 args.strip_envelope.unwrap_or(true),
355 )?;
356 Ok(MaterializeYjsRowResult { row })
357}
358
359pub fn build_yjs_text_update_json(args_json: &str) -> Result<String> {
360 validate_crdt_request_json_size(args_json)?;
361 let args: BuildYjsTextUpdateArgs = serde_json::from_str(args_json)?;
362 Ok(serde_json::to_string(&build_yjs_text_update(args)?)?)
363}
364
365pub fn apply_yjs_text_updates_json(args_json: &str) -> Result<String> {
366 validate_crdt_request_json_size(args_json)?;
367 let args: ApplyYjsTextUpdatesArgs = serde_json::from_str(args_json)?;
368 Ok(serde_json::to_string(&apply_yjs_text_updates(args)?)?)
369}
370
371pub fn apply_yjs_envelope_to_payload_json(args_json: &str) -> Result<String> {
372 validate_crdt_request_json_size(args_json)?;
373 let args: ApplyYjsEnvelopeToPayloadArgs = serde_json::from_str(args_json)?;
374 Ok(serde_json::to_string(&apply_yjs_envelope_to_payload(
375 args,
376 )?)?)
377}
378
379pub fn materialize_yjs_row_json(args_json: &str) -> Result<String> {
380 validate_crdt_request_json_size(args_json)?;
381 let args: MaterializeYjsRowArgs = serde_json::from_str(args_json)?;
382 Ok(serde_json::to_string(&materialize_yjs_row(args)?)?)
383}
384
385pub fn transform_operation_payload_for_metadata(
386 operation: &mut SyncOperation,
387 existing_row: Option<&Value>,
388 metadata: &AppTableMetadata,
389) -> Result<()> {
390 let rules = rules_from_metadata(metadata)?;
391 if operation.op != "upsert" || rules.is_empty() {
392 return Ok(());
393 }
394 let Some(payload) = operation.payload.take() else {
395 return Ok(());
396 };
397 operation.payload = Some(transform_payload(
398 metadata.name,
399 Some(&operation.row_id),
400 payload,
401 existing_row,
402 &rules,
403 YJS_PAYLOAD_KEY,
404 true,
405 true,
406 )?);
407 Ok(())
408}
409
410pub fn transform_local_row_for_metadata(
411 table: &str,
412 row_id: &str,
413 local_row: Option<Value>,
414 operation_payload: Option<&Value>,
415 existing_row: Option<&Value>,
416 metadata: &AppTableMetadata,
417) -> Result<Option<Value>> {
418 let rules = rules_from_metadata(metadata)?;
419 if rules.is_empty() {
420 return Ok(local_row);
421 }
422 let local_row = match local_row {
423 Some(local_row) => local_row,
424 None => {
425 let Some(operation_payload) = operation_payload else {
426 return Ok(None);
427 };
428 if !has_envelope(operation_payload, YJS_PAYLOAD_KEY) {
429 return Ok(None);
430 }
431 if let Some(existing_row) = existing_row {
432 merge_operation_payload_into_local_row(
433 existing_row.clone(),
434 operation_payload,
435 metadata,
436 YJS_PAYLOAD_KEY,
437 )
438 } else {
439 let Value::Object(mut row) = operation_payload.clone() else {
440 return Ok(None);
441 };
442 strip_enveloped_materialized_fields(&mut row, metadata, YJS_PAYLOAD_KEY);
443 row.insert(
444 metadata.primary_key_column.to_string(),
445 Value::String(row_id.to_string()),
446 );
447 Value::Object(row)
448 }
449 }
450 };
451 let local_row =
452 with_operation_envelope(local_row, operation_payload, metadata, YJS_PAYLOAD_KEY);
453 if !has_envelope(&local_row, YJS_PAYLOAD_KEY) {
454 return Ok(Some(local_row));
455 }
456 Ok(Some(transform_payload(
457 table,
458 Some(row_id),
459 local_row,
460 existing_row,
461 &rules,
462 YJS_PAYLOAD_KEY,
463 true,
464 true,
465 )?))
466}
467
468pub fn materialize_row_for_metadata(
469 table: &str,
470 row_id: Option<&str>,
471 row: Value,
472 metadata: &AppTableMetadata,
473) -> Result<Value> {
474 let rules = rules_from_metadata(metadata)?;
475 if rules.is_empty() {
476 return Ok(row);
477 }
478 materialize_row(table, row_id, row, &rules, YJS_PAYLOAD_KEY, true)
479}
480
481pub fn rules_from_metadata(metadata: &AppTableMetadata) -> Result<Vec<YjsFieldRule>> {
482 metadata
483 .crdt_yjs_fields
484 .iter()
485 .filter(|field| field.sync_mode == "server-merge" || field.sync_mode.is_empty())
486 .map(|field| rule_from_metadata(metadata.name, field))
487 .collect()
488}
489
490fn rule_from_metadata(table: &str, field: &CrdtYjsFieldMetadata) -> Result<YjsFieldRule> {
491 Ok(YjsFieldRule {
492 table: table.to_string(),
493 field: field.field.to_string(),
494 state_column: field.state_column.to_string(),
495 container_key: Some(field.container_key.to_string()),
496 row_id_field: Some(field.row_id_field.to_string()),
497 kind: YjsFieldKind::from_metadata(field.kind)?,
498 })
499}
500
501#[cfg(feature = "crdt-yjs")]
502fn transform_payload(
503 table: &str,
504 row_id: Option<&str>,
505 payload: Value,
506 existing_row: Option<&Value>,
507 rules: &[YjsFieldRule],
508 envelope_key: &str,
509 strict: bool,
510 strip_envelope: bool,
511) -> Result<Value> {
512 let mut payload = match payload {
513 Value::Object(payload) => payload,
514 other => return Ok(other),
515 };
516 let table_rules = table_rule_index(table, rules)?;
517 let raw_envelope = payload.get(envelope_key).cloned();
518
519 if table_rules.is_empty() {
520 if raw_envelope.is_some() && strict {
521 return Err(SyncularError::protocol_message(format!(
522 "Yjs envelope provided for table \"{table}\" without matching rules"
523 )));
524 }
525 if strip_envelope {
526 payload.remove(envelope_key);
527 }
528 return Ok(Value::Object(payload));
529 }
530
531 let Some(raw_envelope) = raw_envelope else {
532 if strip_envelope {
533 payload.remove(envelope_key);
534 }
535 return Ok(Value::Object(payload));
536 };
537 let Value::Object(envelope) = raw_envelope else {
538 return Err(SyncularError::protocol_message(format!(
539 "Yjs payload key \"{envelope_key}\" must be an object for table \"{table}\""
540 )));
541 };
542
543 for (field, raw_update_input) in envelope {
544 let Some(rule) = table_rules.get(&field) else {
545 if strict {
546 return Err(SyncularError::protocol_message(format!(
547 "No Yjs rule found for envelope field \"{field}\" on table \"{table}\""
548 )));
549 }
550 continue;
551 };
552 let updates =
553 normalize_update_envelopes(raw_update_input, &format!("yjs.{table}.{field}"))?;
554 let base_state = existing_row
555 .and_then(|row| state_value_to_base64(row.get(&rule.state_column)))
556 .or_else(|| state_value_to_base64(payload.get(&rule.state_column)));
557 if base_state.is_none() {
558 if let Some(required) = updates
559 .iter()
560 .find_map(|update| required_state_vector(update))
561 {
562 return Err(SyncularError::protocol_message(format!(
563 "Yjs diff envelope for table \"{table}\" row \"{}\" field \"{field}\" requires local base state vector {required}, but no local state is available; {FULL_SNAPSHOT_RESYNC_REQUIRED}",
564 row_id.unwrap_or("<unknown>")
565 )));
566 }
567 }
568 let doc = create_doc_from_state(base_state.as_deref())?;
569 if base_state.is_none() {
570 seed_rule_value_from_rows(&doc, rule, &Map::new(), existing_row);
571 }
572 apply_updates(&doc, &updates)?;
573 let next_value = materialize_rule_value(&doc, rule)?;
574 let next_state = {
575 let txn = doc.transact();
576 encode_base64(&txn.encode_state_as_update_v1(&StateVector::default()))
577 };
578 payload.insert(rule.field.clone(), next_value);
579 payload.insert(rule.state_column.clone(), Value::String(next_state));
580 }
581
582 if strip_envelope {
583 payload.remove(envelope_key);
584 }
585 Ok(Value::Object(payload))
586}
587
588#[cfg(not(feature = "crdt-yjs"))]
589fn transform_payload(
590 table: &str,
591 _row_id: Option<&str>,
592 payload: Value,
593 _existing_row: Option<&Value>,
594 rules: &[YjsFieldRule],
595 envelope_key: &str,
596 strict: bool,
597 strip_envelope: bool,
598) -> Result<Value> {
599 let mut payload = match payload {
600 Value::Object(payload) => payload,
601 other => return Ok(other),
602 };
603 let table_rules = table_rule_index(table, rules)?;
604 let raw_envelope = payload.get(envelope_key).cloned();
605
606 if table_rules.is_empty() {
607 if raw_envelope.is_some() && strict {
608 return Err(SyncularError::protocol_message(format!(
609 "Yjs envelope provided for table \"{table}\" without matching rules"
610 )));
611 }
612 if strip_envelope {
613 payload.remove(envelope_key);
614 }
615 return Ok(Value::Object(payload));
616 }
617
618 if raw_envelope.is_some() {
619 return Err(crdt_yjs_feature_disabled());
620 }
621 if strip_envelope {
622 payload.remove(envelope_key);
623 }
624 Ok(Value::Object(payload))
625}
626
627#[cfg(feature = "crdt-yjs")]
628fn materialize_row(
629 table: &str,
630 _row_id: Option<&str>,
631 row: Value,
632 rules: &[YjsFieldRule],
633 envelope_key: &str,
634 strip_envelope: bool,
635) -> Result<Value> {
636 let mut row = match row {
637 Value::Object(row) => row,
638 other => return Ok(other),
639 };
640 for rule in table_rule_index(table, rules)?.values() {
641 let Some(state_base64) = state_value_to_base64(row.get(&rule.state_column)) else {
642 continue;
643 };
644 let doc = create_doc_from_state(Some(&state_base64))?;
645 row.insert(rule.field.clone(), materialize_rule_value(&doc, rule)?);
646 }
647 if strip_envelope {
648 row.remove(envelope_key);
649 }
650 Ok(Value::Object(row))
651}
652
653#[cfg(not(feature = "crdt-yjs"))]
654fn materialize_row(
655 table: &str,
656 _row_id: Option<&str>,
657 row: Value,
658 rules: &[YjsFieldRule],
659 envelope_key: &str,
660 strip_envelope: bool,
661) -> Result<Value> {
662 let mut row = match row {
663 Value::Object(row) => row,
664 other => return Ok(other),
665 };
666 let table_rules = table_rule_index(table, rules)?;
667 if !table_rules.is_empty() {
668 return Err(crdt_yjs_feature_disabled());
669 }
670 if strip_envelope {
671 row.remove(envelope_key);
672 }
673 Ok(Value::Object(row))
674}
675
676fn table_rule_index(table: &str, rules: &[YjsFieldRule]) -> Result<BTreeMap<String, YjsFieldRule>> {
677 let mut out = BTreeMap::new();
678 let mut seen = BTreeSet::new();
679 for rule in rules.iter().filter(|rule| rule.table == table) {
680 if rule.field.trim().is_empty() {
681 return Err(SyncularError::config(
682 "Yjs field rule field cannot be empty",
683 ));
684 }
685 if rule.state_column.trim().is_empty() {
686 return Err(SyncularError::config(
687 "Yjs field rule stateColumn cannot be empty",
688 ));
689 }
690 if !seen.insert(rule.field.clone()) {
691 return Err(SyncularError::config(format!(
692 "duplicate Yjs rule for table \"{table}\", field \"{}\"",
693 rule.field
694 )));
695 }
696 out.insert(
697 rule.field.clone(),
698 YjsFieldRule {
699 table: rule.table.clone(),
700 field: rule.field.clone(),
701 state_column: rule.state_column.clone(),
702 container_key: Some(
703 rule.container_key
704 .clone()
705 .unwrap_or_else(|| rule.field.clone()),
706 ),
707 row_id_field: Some(
708 rule.row_id_field
709 .clone()
710 .unwrap_or_else(|| "id".to_string()),
711 ),
712 kind: rule.kind,
713 },
714 );
715 }
716 Ok(out)
717}
718
719#[cfg(feature = "crdt-yjs")]
720fn normalize_update_envelopes(value: Value, context: &str) -> Result<Vec<YjsUpdateEnvelope>> {
721 match value {
722 Value::Array(values) => values
723 .into_iter()
724 .enumerate()
725 .map(|(index, value)| normalize_update_envelope(value, &format!("{context}[{index}]")))
726 .collect(),
727 value => Ok(vec![normalize_update_envelope(value, context)?]),
728 }
729}
730
731#[cfg(feature = "crdt-yjs")]
732fn normalize_update_envelope(value: Value, context: &str) -> Result<YjsUpdateEnvelope> {
733 let envelope: YjsUpdateEnvelope = serde_json::from_value(value).map_err(|err| {
734 SyncularError::protocol_message(format!("{context} must be a Yjs update envelope: {err}"))
735 })?;
736 if envelope.update_id.trim().is_empty() {
737 return Err(SyncularError::protocol_message(format!(
738 "{context}.updateId must be a non-empty string"
739 )));
740 }
741 if envelope.update_base64.trim().is_empty() {
742 return Err(SyncularError::protocol_message(format!(
743 "{context}.updateBase64 must be a non-empty base64 string"
744 )));
745 }
746 if envelope
747 .requires_state_vector_base64
748 .as_deref()
749 .is_some_and(|value| value.trim().is_empty())
750 {
751 return Err(SyncularError::protocol_message(format!(
752 "{context}.requiresStateVectorBase64 must be a non-empty base64 string when provided"
753 )));
754 }
755 validate_yjs_update_envelope_size(&envelope)?;
756 Ok(envelope)
757}
758
759#[cfg(feature = "crdt-yjs")]
760fn create_doc_from_state(state_base64: Option<&str>) -> Result<Doc> {
761 let doc = Doc::new();
762 if let Some(state_base64) = state_base64.filter(|value| !value.trim().is_empty()) {
763 validate_yjs_state_base64_size(state_base64)?;
764 let bytes = decode_base64(state_base64)?;
765 let update = Update::decode_v1(bytes.as_slice())
766 .map_err(|err| SyncularError::protocol_message(format!("decode Yjs state: {err}")))?;
767 doc.transact_mut()
768 .apply_update(update)
769 .map_err(|err| SyncularError::protocol_message(format!("apply Yjs state: {err}")))?;
770 }
771 Ok(doc)
772}
773
774#[cfg(feature = "crdt-yjs")]
775fn apply_updates(doc: &Doc, updates: &[YjsUpdateEnvelope]) -> Result<()> {
776 validate_yjs_update_envelope_list_size(updates)?;
777 let mut txn = doc.transact_mut();
778 for update in updates {
779 if let Some(required) = required_state_vector(update) {
780 let actual = encode_base64(&txn.state_vector().encode_v1());
781 if actual != required {
782 return Err(SyncularError::protocol_message(format!(
783 "Yjs update {} requires base state vector {required}, but current state vector is {actual}; {FULL_SNAPSHOT_RESYNC_REQUIRED}",
784 update.update_id
785 )));
786 }
787 }
788 let bytes = decode_base64(&update.update_base64)?;
789 let decoded = Update::decode_v1(bytes.as_slice()).map_err(|err| {
790 SyncularError::protocol_message(format!(
791 "decode Yjs update {}: {err}",
792 update.update_id
793 ))
794 })?;
795 txn.apply_update(decoded).map_err(|err| {
796 SyncularError::protocol_message(format!("apply Yjs update {}: {err}", update.update_id))
797 })?;
798 }
799 Ok(())
800}
801
802#[cfg(feature = "crdt-yjs")]
803fn required_state_vector(update: &YjsUpdateEnvelope) -> Option<&str> {
804 update
805 .requires_state_vector_base64
806 .as_deref()
807 .filter(|value| !value.trim().is_empty())
808}
809
810#[cfg(feature = "crdt-yjs")]
811fn patch_text(doc: &Doc, container_key: &str, next_text: &str) {
812 let text = doc.get_or_insert_text(container_key);
813 let current_text = {
814 let txn = doc.transact();
815 text.get_string(&txn)
816 };
817 if current_text == next_text {
818 return;
819 }
820
821 let prefix_len = common_prefix_boundary(¤t_text, next_text);
822 let (current_suffix_start, next_suffix_start) =
823 common_suffix_boundaries(¤t_text, next_text, prefix_len);
824
825 let delete_len = current_suffix_start - prefix_len;
826 let insert_segment = &next_text[prefix_len..next_suffix_start];
827 let mut txn = doc.transact_mut();
828 if delete_len > 0 {
829 text.remove_range(&mut txn, prefix_len as u32, delete_len as u32);
830 }
831 if !insert_segment.is_empty() {
832 text.insert(&mut txn, prefix_len as u32, insert_segment);
833 }
834}
835
836#[cfg(feature = "crdt-yjs")]
837fn common_prefix_boundary(left: &str, right: &str) -> usize {
838 let mut prefix = 0;
839 for ((left_index, left_char), (right_index, right_char)) in
840 left.char_indices().zip(right.char_indices())
841 {
842 if left_index != right_index || left_char != right_char {
843 break;
844 }
845 prefix = left_index + left_char.len_utf8();
846 }
847 prefix
848}
849
850#[cfg(feature = "crdt-yjs")]
851fn common_suffix_boundaries(left: &str, right: &str, prefix_len: usize) -> (usize, usize) {
852 let mut left_start = left.len();
853 let mut right_start = right.len();
854 let mut left_chars = left[prefix_len..].char_indices().rev();
855 let mut right_chars = right[prefix_len..].char_indices().rev();
856
857 while let (Some((left_index, left_char)), Some((right_index, right_char))) =
858 (left_chars.next(), right_chars.next())
859 {
860 if left_char != right_char {
861 break;
862 }
863 left_start = prefix_len + left_index;
864 right_start = prefix_len + right_index;
865 }
866
867 (left_start, right_start)
868}
869
870#[cfg(feature = "crdt-yjs")]
871fn seed_rule_value_from_rows(
872 doc: &Doc,
873 rule: &YjsFieldRule,
874 payload: &Map<String, Value>,
875 existing_row: Option<&Value>,
876) {
877 if rule.kind != YjsFieldKind::Text {
878 return;
879 }
880 let initial = payload
881 .get(&rule.field)
882 .and_then(Value::as_str)
883 .or_else(|| existing_row.and_then(|row| row.get(&rule.field).and_then(Value::as_str)));
884 if let Some(initial) = initial.filter(|value| !value.is_empty()) {
885 patch_text(
886 doc,
887 rule.container_key.as_deref().unwrap_or(&rule.field),
888 initial,
889 );
890 }
891}
892
893#[cfg(feature = "crdt-yjs")]
894fn materialize_rule_value(doc: &Doc, rule: &YjsFieldRule) -> Result<Value> {
895 let container_key = rule.container_key.as_deref().unwrap_or(&rule.field);
896 let text_ref;
897 let xml_ref;
898 match rule.kind {
899 YjsFieldKind::Text => {
900 text_ref = Some(doc.get_or_insert_text(container_key));
901 xml_ref = None;
902 }
903 YjsFieldKind::XmlFragment | YjsFieldKind::Prosemirror => {
904 text_ref = None;
905 xml_ref = Some(doc.get_or_insert_xml_fragment(container_key));
906 }
907 }
908 let txn = doc.transact();
909 let value = match rule.kind {
910 YjsFieldKind::Text => text_ref.expect("text ref is initialized").get_string(&txn),
911 YjsFieldKind::XmlFragment | YjsFieldKind::Prosemirror => {
912 xml_ref.expect("xml ref is initialized").get_string(&txn)
913 }
914 };
915 Ok(Value::String(value))
916}
917
918fn has_envelope(value: &Value, envelope_key: &str) -> bool {
919 value
920 .as_object()
921 .is_some_and(|object| object.contains_key(envelope_key))
922}
923
924fn with_operation_envelope(
925 local_row: Value,
926 operation_payload: Option<&Value>,
927 metadata: &AppTableMetadata,
928 envelope_key: &str,
929) -> Value {
930 if has_envelope(&local_row, envelope_key) {
931 return local_row;
932 }
933 let Some(envelope) = operation_payload.and_then(|payload| payload.get(envelope_key)) else {
934 return local_row;
935 };
936 let Value::Object(mut row) = local_row else {
937 return local_row;
938 };
939 row.insert(envelope_key.to_string(), envelope.clone());
940 strip_enveloped_materialized_fields(&mut row, metadata, envelope_key);
941 Value::Object(row)
942}
943
944fn merge_operation_payload_into_local_row(
945 local_row: Value,
946 operation_payload: &Value,
947 metadata: &AppTableMetadata,
948 envelope_key: &str,
949) -> Value {
950 match local_row {
951 Value::Object(mut row) => {
952 if let Value::Object(payload) = operation_payload {
953 for (key, value) in payload {
954 row.insert(key.clone(), value.clone());
955 }
956 }
957 strip_enveloped_materialized_fields(&mut row, metadata, envelope_key);
958 Value::Object(row)
959 }
960 other => other,
961 }
962}
963
964fn strip_enveloped_materialized_fields(
965 row: &mut Map<String, Value>,
966 metadata: &AppTableMetadata,
967 envelope_key: &str,
968) {
969 let Some(envelope) = row.get(envelope_key).and_then(Value::as_object) else {
970 return;
971 };
972 let enveloped_fields = envelope.keys().cloned().collect::<Vec<_>>();
973 for field_name in enveloped_fields {
974 if let Some(field) = metadata
975 .crdt_yjs_fields
976 .iter()
977 .find(|candidate| candidate.field == field_name)
978 {
979 row.remove(field.field);
980 row.remove(field.state_column);
981 }
982 }
983}
984
985#[cfg(feature = "crdt-yjs")]
986fn state_value_to_base64(value: Option<&Value>) -> Option<String> {
987 match value? {
988 Value::String(value) if !value.is_empty() => Some(value.clone()),
989 _ => None,
990 }
991}
992
993#[cfg(feature = "crdt-yjs")]
994fn encode_base64(bytes: &[u8]) -> String {
995 BASE64.encode(bytes)
996}
997
998#[cfg(feature = "crdt-yjs")]
999fn decode_base64(value: &str) -> Result<Vec<u8>> {
1000 BASE64
1001 .decode(value)
1002 .map_err(|err| SyncularError::protocol_message(format!("invalid base64 string: {err}")))
1003}
1004
1005#[cfg(not(feature = "crdt-yjs"))]
1006fn crdt_yjs_feature_disabled() -> SyncularError {
1007 SyncularError::config("CRDT Yjs support is not enabled in this Syncular runtime build")
1008}
1009
1010#[cfg(all(test, feature = "crdt-yjs"))]
1011mod tests {
1012 use super::*;
1013 use serde_json::json;
1014
1015 #[test]
1016 fn text_updates_merge_concurrent_changes() -> Result<()> {
1017 let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1018 previous_state_base64: None,
1019 next_text: "middle".to_string(),
1020 container_key: None,
1021 update_id: Some("base".to_string()),
1022 })?;
1023 let prepend = build_yjs_text_update(BuildYjsTextUpdateArgs {
1024 previous_state_base64: Some(base.next_state_base64.clone()),
1025 next_text: "left middle".to_string(),
1026 container_key: None,
1027 update_id: Some("prepend".to_string()),
1028 })?;
1029 let append = build_yjs_text_update(BuildYjsTextUpdateArgs {
1030 previous_state_base64: Some(base.next_state_base64.clone()),
1031 next_text: "middle right".to_string(),
1032 container_key: None,
1033 update_id: Some("append".to_string()),
1034 })?;
1035
1036 let forward = apply_yjs_text_updates(ApplyYjsTextUpdatesArgs {
1037 previous_state_base64: Some(base.next_state_base64.clone()),
1038 updates: vec![prepend.update.clone(), append.update.clone()],
1039 container_key: None,
1040 })?;
1041 let reverse = apply_yjs_text_updates(ApplyYjsTextUpdatesArgs {
1042 previous_state_base64: Some(base.next_state_base64),
1043 updates: vec![append.update, prepend.update],
1044 container_key: None,
1045 })?;
1046
1047 assert_eq!(forward.text, reverse.text);
1048 assert!(forward.text.contains("left"));
1049 assert!(forward.text.contains("middle"));
1050 assert!(forward.text.contains("right"));
1051 Ok(())
1052 }
1053
1054 #[test]
1055 fn envelope_materializes_payload_and_strips_transport_key() -> Result<()> {
1056 let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1057 previous_state_base64: None,
1058 next_text: "hello".to_string(),
1059 container_key: Some("title".to_string()),
1060 update_id: Some("base".to_string()),
1061 })?;
1062 let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1063 previous_state_base64: Some(base.next_state_base64.clone()),
1064 next_text: "hello world".to_string(),
1065 container_key: Some("title".to_string()),
1066 update_id: Some("next".to_string()),
1067 })?;
1068 let result = apply_yjs_envelope_to_payload(ApplyYjsEnvelopeToPayloadArgs {
1069 table: "tasks".to_string(),
1070 row_id: Some("task-1".to_string()),
1071 payload: json!({ "__yjs": { "title": next.update } }),
1072 existing_row: Some(json!({
1073 "id": "task-1",
1074 "title": "hello",
1075 "title_yjs_state": base.next_state_base64
1076 })),
1077 rules: vec![YjsFieldRule {
1078 table: "tasks".to_string(),
1079 field: "title".to_string(),
1080 state_column: "title_yjs_state".to_string(),
1081 container_key: Some("title".to_string()),
1082 row_id_field: Some("id".to_string()),
1083 kind: YjsFieldKind::Text,
1084 }],
1085 envelope_key: None,
1086 strict: None,
1087 strip_envelope: None,
1088 })?;
1089
1090 assert_eq!(result.payload["title"], "hello world");
1091 assert!(result.payload["title_yjs_state"].as_str().is_some());
1092 assert!(result.payload.get(YJS_PAYLOAD_KEY).is_none());
1093 Ok(())
1094 }
1095
1096 #[test]
1097 fn initial_yjs_envelope_does_not_duplicate_plain_payload_text() -> Result<()> {
1098 let initial = build_yjs_text_update(BuildYjsTextUpdateArgs {
1099 previous_state_base64: None,
1100 next_text: "Draft".to_string(),
1101 container_key: Some("title".to_string()),
1102 update_id: Some("initial".to_string()),
1103 })?;
1104 let result = apply_yjs_envelope_to_payload(ApplyYjsEnvelopeToPayloadArgs {
1105 table: "tasks".to_string(),
1106 row_id: Some("task-1".to_string()),
1107 payload: json!({
1108 "title": "Draft",
1109 "__yjs": { "title": initial.update }
1110 }),
1111 existing_row: None,
1112 rules: vec![YjsFieldRule {
1113 table: "tasks".to_string(),
1114 field: "title".to_string(),
1115 state_column: "title_yjs_state".to_string(),
1116 container_key: Some("title".to_string()),
1117 row_id_field: Some("id".to_string()),
1118 kind: YjsFieldKind::Text,
1119 }],
1120 envelope_key: None,
1121 strict: None,
1122 strip_envelope: None,
1123 })?;
1124
1125 assert_eq!(result.payload["title"], "Draft");
1126 assert!(result.payload["title_yjs_state"].as_str().is_some());
1127 assert!(result.payload.get(YJS_PAYLOAD_KEY).is_none());
1128 Ok(())
1129 }
1130
1131 #[test]
1132 fn diff_envelope_remote_rows_preserve_non_crdt_payload_fields() -> Result<()> {
1133 static CRDT_FIELDS: &[CrdtYjsFieldMetadata] = &[CrdtYjsFieldMetadata {
1134 field: "title",
1135 state_column: "title_yjs_state",
1136 container_key: "title",
1137 row_id_field: "id",
1138 kind: "text",
1139 sync_mode: "server-merge",
1140 }];
1141 static TABLE: AppTableMetadata = AppTableMetadata {
1142 name: "tasks",
1143 primary_key_column: "id",
1144 server_version_column: "server_version",
1145 soft_delete_column: None,
1146 subscription_id: "tasks",
1147 columns: &[],
1148 blob_columns: &[],
1149 crdt_yjs_fields: CRDT_FIELDS,
1150 encrypted_fields: &[],
1151 scopes: &[],
1152 };
1153
1154 let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1155 previous_state_base64: None,
1156 next_text: "hello".to_string(),
1157 container_key: Some("title".to_string()),
1158 update_id: Some("base".to_string()),
1159 })?;
1160 let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1161 previous_state_base64: Some(base.next_state_base64.clone()),
1162 next_text: "hello world".to_string(),
1163 container_key: Some("title".to_string()),
1164 update_id: Some("next".to_string()),
1165 })?;
1166 let mut next_update = next.update;
1167 next_update.requires_state_vector_base64 =
1168 Some(yjs_state_vector_base64(Some(&base.next_state_base64))?);
1169
1170 let row = transform_local_row_for_metadata(
1171 "tasks",
1172 "task-1",
1173 None,
1174 Some(&json!({
1175 "updated_at": 2,
1176 "__yjs": { "title": next_update }
1177 })),
1178 Some(&json!({
1179 "id": "task-1",
1180 "title": "hello",
1181 "title_yjs_state": base.next_state_base64,
1182 "updated_at": 1
1183 })),
1184 &TABLE,
1185 )?
1186 .expect("diff envelope materializes existing row");
1187
1188 assert_eq!(row["title"], "hello world");
1189 assert_eq!(row["updated_at"], 2);
1190 assert!(row["title_yjs_state"].as_str().is_some());
1191 assert!(row.get(YJS_PAYLOAD_KEY).is_none());
1192 Ok(())
1193 }
1194
1195 #[test]
1196 fn diff_envelope_without_required_local_base_requests_resync() -> Result<()> {
1197 static CRDT_FIELDS: &[CrdtYjsFieldMetadata] = &[CrdtYjsFieldMetadata {
1198 field: "title",
1199 state_column: "title_yjs_state",
1200 container_key: "title",
1201 row_id_field: "id",
1202 kind: "text",
1203 sync_mode: "server-merge",
1204 }];
1205 static TABLE: AppTableMetadata = AppTableMetadata {
1206 name: "tasks",
1207 primary_key_column: "id",
1208 server_version_column: "server_version",
1209 soft_delete_column: None,
1210 subscription_id: "tasks",
1211 columns: &[],
1212 blob_columns: &[],
1213 crdt_yjs_fields: CRDT_FIELDS,
1214 encrypted_fields: &[],
1215 scopes: &[],
1216 };
1217
1218 let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1219 previous_state_base64: None,
1220 next_text: "hello".to_string(),
1221 container_key: Some("title".to_string()),
1222 update_id: Some("base".to_string()),
1223 })?;
1224 let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1225 previous_state_base64: Some(base.next_state_base64.clone()),
1226 next_text: "hello world".to_string(),
1227 container_key: Some("title".to_string()),
1228 update_id: Some("next".to_string()),
1229 })?;
1230 let mut next_update = next.update;
1231 next_update.requires_state_vector_base64 =
1232 Some(yjs_state_vector_base64(Some(&base.next_state_base64))?);
1233
1234 let err = transform_local_row_for_metadata(
1235 "tasks",
1236 "task-1",
1237 None,
1238 Some(&json!({
1239 "__yjs": { "title": next_update }
1240 })),
1241 None,
1242 &TABLE,
1243 )
1244 .expect_err("server diff without local base must request resync");
1245
1246 let message = err.to_string();
1247 assert!(message.contains("tasks"));
1248 assert!(message.contains("task-1"));
1249 assert!(message.contains("title"));
1250 assert!(message.contains("full snapshot resync required"));
1251 Ok(())
1252 }
1253}
1254
1255#[cfg(all(test, not(feature = "crdt-yjs")))]
1256mod tests_without_crdt_yjs {
1257 use super::*;
1258
1259 #[test]
1260 fn yjs_operations_report_disabled_feature() {
1261 let err = build_yjs_text_update(BuildYjsTextUpdateArgs {
1262 previous_state_base64: None,
1263 next_text: "hello".to_string(),
1264 container_key: None,
1265 update_id: None,
1266 })
1267 .expect_err("CRDT/Yjs operation should require crdt-yjs feature");
1268
1269 assert!(err.to_string().contains("CRDT Yjs support is not enabled"));
1270 }
1271}