Skip to main content

syncular_runtime/core/
crdt_yjs.rs

1use crate::app_schema::{AppTableMetadata, CrdtYjsFieldMetadata};
2#[cfg(feature = "crdt-yjs")]
3use crate::error::FULL_SNAPSHOT_RESYNC_REQUIRED;
4use crate::error::{Result, SyncularError};
5use crate::limits::{
6    MAX_CRDT_REQUEST_JSON_BYTES, MAX_CRDT_STATE_BASE64_BYTES, MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
7    MAX_CRDT_TEXT_BYTES, MAX_CRDT_UPDATE_BASE64_BYTES,
8};
9#[cfg(feature = "crdt-yjs")]
10use crate::protocol::random_syncular_id;
11use crate::protocol::{validate_payload_bytes, SyncOperation};
12#[cfg(feature = "crdt-yjs")]
13use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
14use serde::{Deserialize, Serialize};
15use serde_json::{Map, Value};
16use std::collections::{BTreeMap, BTreeSet};
17#[cfg(feature = "crdt-yjs")]
18use yrs::updates::decoder::Decode;
19#[cfg(feature = "crdt-yjs")]
20use yrs::updates::encoder::Encode;
21#[cfg(feature = "crdt-yjs")]
22use yrs::{Doc, GetString, ReadTxn, StateVector, Text, Transact, Update};
23
24pub const YJS_PAYLOAD_KEY: &str = "__yjs";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "kebab-case")]
28pub enum YjsFieldKind {
29    Text,
30    XmlFragment,
31    Prosemirror,
32}
33
34impl Default for YjsFieldKind {
35    fn default() -> Self {
36        Self::Text
37    }
38}
39
40impl YjsFieldKind {
41    pub fn from_metadata(value: &str) -> Result<Self> {
42        match value {
43            "text" => Ok(Self::Text),
44            "xml-fragment" => Ok(Self::XmlFragment),
45            "prosemirror" => Ok(Self::Prosemirror),
46            other => Err(SyncularError::config(format!(
47                "unsupported Yjs field kind: {other}"
48            ))),
49        }
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct YjsFieldRule {
56    pub table: String,
57    pub field: String,
58    pub state_column: String,
59    #[serde(default)]
60    pub container_key: Option<String>,
61    #[serde(default)]
62    pub row_id_field: Option<String>,
63    #[serde(default)]
64    pub kind: YjsFieldKind,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct YjsUpdateEnvelope {
70    pub update_id: String,
71    pub update_base64: String,
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub requires_state_vector_base64: Option<String>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct BuildYjsTextUpdateArgs {
79    #[serde(default)]
80    pub previous_state_base64: Option<String>,
81    pub next_text: String,
82    #[serde(default)]
83    pub container_key: Option<String>,
84    #[serde(default)]
85    pub update_id: Option<String>,
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90pub struct BuildYjsTextUpdateResult {
91    pub update: YjsUpdateEnvelope,
92    pub next_state_base64: String,
93    pub next_text: String,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97#[serde(rename_all = "camelCase")]
98pub struct ApplyYjsTextUpdatesArgs {
99    #[serde(default)]
100    pub previous_state_base64: Option<String>,
101    pub updates: Vec<YjsUpdateEnvelope>,
102    #[serde(default)]
103    pub container_key: Option<String>,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
107#[serde(rename_all = "camelCase")]
108pub struct ApplyYjsTextUpdatesResult {
109    pub next_state_base64: String,
110    pub text: String,
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct ApplyYjsUpdatesToStateArgs {
116    #[serde(default)]
117    pub previous_state_base64: Option<String>,
118    pub updates: Vec<YjsUpdateEnvelope>,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
122#[serde(rename_all = "camelCase")]
123pub struct ApplyYjsUpdatesToStateResult {
124    pub next_state_base64: String,
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(rename_all = "camelCase")]
129pub struct ApplyYjsEnvelopeToPayloadArgs {
130    pub table: String,
131    #[serde(default)]
132    pub row_id: Option<String>,
133    pub payload: Value,
134    #[serde(default)]
135    pub existing_row: Option<Value>,
136    pub rules: Vec<YjsFieldRule>,
137    #[serde(default)]
138    pub envelope_key: Option<String>,
139    #[serde(default)]
140    pub strict: Option<bool>,
141    #[serde(default)]
142    pub strip_envelope: Option<bool>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "camelCase")]
147pub struct ApplyYjsEnvelopeToPayloadResult {
148    pub payload: Value,
149}
150
151pub fn validate_crdt_request_json_size(request_json: &str) -> Result<()> {
152    validate_payload_bytes(
153        "maxCrdtRequestJsonBytes",
154        request_json.len(),
155        MAX_CRDT_REQUEST_JSON_BYTES,
156        "Syncular CRDT request JSON exceeds the configured limit",
157    )
158}
159
160pub fn validate_yjs_update_envelope_size(update: &YjsUpdateEnvelope) -> Result<()> {
161    validate_payload_bytes(
162        "maxCrdtUpdateBase64Bytes",
163        update.update_base64.len(),
164        MAX_CRDT_UPDATE_BASE64_BYTES,
165        "Syncular CRDT updateBase64 exceeds the configured limit",
166    )?;
167    if let Some(required) = &update.requires_state_vector_base64 {
168        validate_payload_bytes(
169            "maxCrdtStateVectorBase64Bytes",
170            required.len(),
171            MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
172            "Syncular CRDT requiresStateVectorBase64 exceeds the configured limit",
173        )?;
174    }
175    Ok(())
176}
177
178pub fn validate_yjs_update_envelope_list_size(updates: &[YjsUpdateEnvelope]) -> Result<()> {
179    for update in updates {
180        validate_yjs_update_envelope_size(update)?;
181    }
182    Ok(())
183}
184
185pub fn validate_yjs_state_base64_size(state_base64: &str) -> Result<()> {
186    validate_payload_bytes(
187        "maxCrdtStateBase64Bytes",
188        state_base64.len(),
189        MAX_CRDT_STATE_BASE64_BYTES,
190        "Syncular CRDT stateBase64 exceeds the configured limit",
191    )
192}
193
194pub fn validate_yjs_text_input_size(next_text: &str) -> Result<()> {
195    validate_payload_bytes(
196        "maxCrdtTextBytes",
197        next_text.len(),
198        MAX_CRDT_TEXT_BYTES,
199        "Syncular CRDT text exceeds the configured limit",
200    )
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize)]
204#[serde(rename_all = "camelCase")]
205pub struct MaterializeYjsRowArgs {
206    pub table: String,
207    #[serde(default)]
208    pub row_id: Option<String>,
209    pub row: Value,
210    pub rules: Vec<YjsFieldRule>,
211    #[serde(default)]
212    pub envelope_key: Option<String>,
213    #[serde(default)]
214    pub strip_envelope: Option<bool>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
218#[serde(rename_all = "camelCase")]
219pub struct MaterializeYjsRowResult {
220    pub row: Value,
221}
222
223#[cfg(feature = "crdt-yjs")]
224pub fn build_yjs_text_update(args: BuildYjsTextUpdateArgs) -> Result<BuildYjsTextUpdateResult> {
225    validate_yjs_text_input_size(&args.next_text)?;
226    let container_key = args.container_key.unwrap_or_else(|| "text".to_string());
227    let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
228    let before = {
229        let txn = doc.transact();
230        txn.state_vector()
231    };
232    patch_text(&doc, &container_key, &args.next_text);
233    let text_ref = doc.get_or_insert_text(container_key);
234    let txn = doc.transact();
235    let update = txn.encode_state_as_update_v1(&before);
236    let next_state = txn.encode_state_as_update_v1(&StateVector::default());
237    let next_text = text_ref.get_string(&txn);
238
239    let next_state_base64 = encode_base64(&next_state);
240    validate_yjs_state_base64_size(&next_state_base64)?;
241    Ok(BuildYjsTextUpdateResult {
242        update: YjsUpdateEnvelope {
243            update_id: args.update_id.unwrap_or_else(random_syncular_id),
244            update_base64: encode_base64(&update),
245            requires_state_vector_base64: None,
246        },
247        next_state_base64,
248        next_text,
249    })
250}
251
252#[cfg(not(feature = "crdt-yjs"))]
253pub fn build_yjs_text_update(_args: BuildYjsTextUpdateArgs) -> Result<BuildYjsTextUpdateResult> {
254    Err(crdt_yjs_feature_disabled())
255}
256
257#[cfg(feature = "crdt-yjs")]
258pub fn apply_yjs_text_updates(args: ApplyYjsTextUpdatesArgs) -> Result<ApplyYjsTextUpdatesResult> {
259    let container_key = args.container_key.unwrap_or_else(|| "text".to_string());
260    let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
261    apply_updates(&doc, &args.updates)?;
262    let text_ref = doc.get_or_insert_text(container_key);
263    let txn = doc.transact();
264    let text = text_ref.get_string(&txn);
265    let next_state = txn.encode_state_as_update_v1(&StateVector::default());
266    let next_state_base64 = encode_base64(&next_state);
267    validate_yjs_state_base64_size(&next_state_base64)?;
268    Ok(ApplyYjsTextUpdatesResult {
269        next_state_base64,
270        text,
271    })
272}
273
274#[cfg(not(feature = "crdt-yjs"))]
275pub fn apply_yjs_text_updates(_args: ApplyYjsTextUpdatesArgs) -> Result<ApplyYjsTextUpdatesResult> {
276    Err(crdt_yjs_feature_disabled())
277}
278
279#[cfg(feature = "crdt-yjs")]
280pub fn apply_yjs_updates_to_state(
281    args: ApplyYjsUpdatesToStateArgs,
282) -> Result<ApplyYjsUpdatesToStateResult> {
283    let doc = create_doc_from_state(args.previous_state_base64.as_deref())?;
284    apply_updates(&doc, &args.updates)?;
285    let next_state = {
286        let txn = doc.transact();
287        txn.encode_state_as_update_v1(&StateVector::default())
288    };
289    let next_state_base64 = encode_base64(&next_state);
290    validate_yjs_state_base64_size(&next_state_base64)?;
291    Ok(ApplyYjsUpdatesToStateResult { next_state_base64 })
292}
293
294#[cfg(not(feature = "crdt-yjs"))]
295pub fn apply_yjs_updates_to_state(
296    _args: ApplyYjsUpdatesToStateArgs,
297) -> Result<ApplyYjsUpdatesToStateResult> {
298    Err(crdt_yjs_feature_disabled())
299}
300
301#[cfg(feature = "crdt-yjs")]
302pub fn materialize_yjs_state(state_base64: &str, rule: &YjsFieldRule) -> Result<Value> {
303    let doc = create_doc_from_state(Some(state_base64))?;
304    materialize_rule_value(&doc, rule)
305}
306
307#[cfg(not(feature = "crdt-yjs"))]
308pub fn materialize_yjs_state(_state_base64: &str, _rule: &YjsFieldRule) -> Result<Value> {
309    Err(crdt_yjs_feature_disabled())
310}
311
312#[cfg(feature = "crdt-yjs")]
313pub fn yjs_state_vector_base64(state_base64: Option<&str>) -> Result<String> {
314    let doc = create_doc_from_state(state_base64)?;
315    let txn = doc.transact();
316    let state_vector_base64 = encode_base64(&txn.state_vector().encode_v1());
317    validate_payload_bytes(
318        "maxCrdtStateVectorBase64Bytes",
319        state_vector_base64.len(),
320        MAX_CRDT_STATE_VECTOR_BASE64_BYTES,
321        "Syncular CRDT stateVectorBase64 exceeds the configured limit",
322    )?;
323    Ok(state_vector_base64)
324}
325
326#[cfg(not(feature = "crdt-yjs"))]
327pub fn yjs_state_vector_base64(_state_base64: Option<&str>) -> Result<String> {
328    Err(crdt_yjs_feature_disabled())
329}
330
331pub fn apply_yjs_envelope_to_payload(
332    args: ApplyYjsEnvelopeToPayloadArgs,
333) -> Result<ApplyYjsEnvelopeToPayloadResult> {
334    let payload = transform_payload(
335        &args.table,
336        args.row_id.as_deref(),
337        args.payload,
338        args.existing_row.as_ref(),
339        &args.rules,
340        args.envelope_key.as_deref().unwrap_or(YJS_PAYLOAD_KEY),
341        args.strict.unwrap_or(true),
342        args.strip_envelope.unwrap_or(true),
343    )?;
344    Ok(ApplyYjsEnvelopeToPayloadResult { payload })
345}
346
347pub fn materialize_yjs_row(args: MaterializeYjsRowArgs) -> Result<MaterializeYjsRowResult> {
348    let row = materialize_row(
349        &args.table,
350        args.row_id.as_deref(),
351        args.row,
352        &args.rules,
353        args.envelope_key.as_deref().unwrap_or(YJS_PAYLOAD_KEY),
354        args.strip_envelope.unwrap_or(true),
355    )?;
356    Ok(MaterializeYjsRowResult { row })
357}
358
359pub fn build_yjs_text_update_json(args_json: &str) -> Result<String> {
360    validate_crdt_request_json_size(args_json)?;
361    let args: BuildYjsTextUpdateArgs = serde_json::from_str(args_json)?;
362    Ok(serde_json::to_string(&build_yjs_text_update(args)?)?)
363}
364
365pub fn apply_yjs_text_updates_json(args_json: &str) -> Result<String> {
366    validate_crdt_request_json_size(args_json)?;
367    let args: ApplyYjsTextUpdatesArgs = serde_json::from_str(args_json)?;
368    Ok(serde_json::to_string(&apply_yjs_text_updates(args)?)?)
369}
370
371pub fn apply_yjs_envelope_to_payload_json(args_json: &str) -> Result<String> {
372    validate_crdt_request_json_size(args_json)?;
373    let args: ApplyYjsEnvelopeToPayloadArgs = serde_json::from_str(args_json)?;
374    Ok(serde_json::to_string(&apply_yjs_envelope_to_payload(
375        args,
376    )?)?)
377}
378
379pub fn materialize_yjs_row_json(args_json: &str) -> Result<String> {
380    validate_crdt_request_json_size(args_json)?;
381    let args: MaterializeYjsRowArgs = serde_json::from_str(args_json)?;
382    Ok(serde_json::to_string(&materialize_yjs_row(args)?)?)
383}
384
385pub fn transform_operation_payload_for_metadata(
386    operation: &mut SyncOperation,
387    existing_row: Option<&Value>,
388    metadata: &AppTableMetadata,
389) -> Result<()> {
390    let rules = rules_from_metadata(metadata)?;
391    if operation.op != "upsert" || rules.is_empty() {
392        return Ok(());
393    }
394    let Some(payload) = operation.payload.take() else {
395        return Ok(());
396    };
397    operation.payload = Some(transform_payload(
398        metadata.name,
399        Some(&operation.row_id),
400        payload,
401        existing_row,
402        &rules,
403        YJS_PAYLOAD_KEY,
404        true,
405        true,
406    )?);
407    Ok(())
408}
409
410pub fn transform_local_row_for_metadata(
411    table: &str,
412    row_id: &str,
413    local_row: Option<Value>,
414    operation_payload: Option<&Value>,
415    existing_row: Option<&Value>,
416    metadata: &AppTableMetadata,
417) -> Result<Option<Value>> {
418    let rules = rules_from_metadata(metadata)?;
419    if rules.is_empty() {
420        return Ok(local_row);
421    }
422    let local_row = match local_row {
423        Some(local_row) => local_row,
424        None => {
425            let Some(operation_payload) = operation_payload else {
426                return Ok(None);
427            };
428            if !has_envelope(operation_payload, YJS_PAYLOAD_KEY) {
429                return Ok(None);
430            }
431            if let Some(existing_row) = existing_row {
432                merge_operation_payload_into_local_row(
433                    existing_row.clone(),
434                    operation_payload,
435                    metadata,
436                    YJS_PAYLOAD_KEY,
437                )
438            } else {
439                let Value::Object(mut row) = operation_payload.clone() else {
440                    return Ok(None);
441                };
442                strip_enveloped_materialized_fields(&mut row, metadata, YJS_PAYLOAD_KEY);
443                row.insert(
444                    metadata.primary_key_column.to_string(),
445                    Value::String(row_id.to_string()),
446                );
447                Value::Object(row)
448            }
449        }
450    };
451    let local_row =
452        with_operation_envelope(local_row, operation_payload, metadata, YJS_PAYLOAD_KEY);
453    if !has_envelope(&local_row, YJS_PAYLOAD_KEY) {
454        return Ok(Some(local_row));
455    }
456    Ok(Some(transform_payload(
457        table,
458        Some(row_id),
459        local_row,
460        existing_row,
461        &rules,
462        YJS_PAYLOAD_KEY,
463        true,
464        true,
465    )?))
466}
467
468pub fn materialize_row_for_metadata(
469    table: &str,
470    row_id: Option<&str>,
471    row: Value,
472    metadata: &AppTableMetadata,
473) -> Result<Value> {
474    let rules = rules_from_metadata(metadata)?;
475    if rules.is_empty() {
476        return Ok(row);
477    }
478    materialize_row(table, row_id, row, &rules, YJS_PAYLOAD_KEY, true)
479}
480
481pub fn rules_from_metadata(metadata: &AppTableMetadata) -> Result<Vec<YjsFieldRule>> {
482    metadata
483        .crdt_yjs_fields
484        .iter()
485        .filter(|field| field.sync_mode == "server-merge" || field.sync_mode.is_empty())
486        .map(|field| rule_from_metadata(metadata.name, field))
487        .collect()
488}
489
490fn rule_from_metadata(table: &str, field: &CrdtYjsFieldMetadata) -> Result<YjsFieldRule> {
491    Ok(YjsFieldRule {
492        table: table.to_string(),
493        field: field.field.to_string(),
494        state_column: field.state_column.to_string(),
495        container_key: Some(field.container_key.to_string()),
496        row_id_field: Some(field.row_id_field.to_string()),
497        kind: YjsFieldKind::from_metadata(field.kind)?,
498    })
499}
500
501#[cfg(feature = "crdt-yjs")]
502fn transform_payload(
503    table: &str,
504    row_id: Option<&str>,
505    payload: Value,
506    existing_row: Option<&Value>,
507    rules: &[YjsFieldRule],
508    envelope_key: &str,
509    strict: bool,
510    strip_envelope: bool,
511) -> Result<Value> {
512    let mut payload = match payload {
513        Value::Object(payload) => payload,
514        other => return Ok(other),
515    };
516    let table_rules = table_rule_index(table, rules)?;
517    let raw_envelope = payload.get(envelope_key).cloned();
518
519    if table_rules.is_empty() {
520        if raw_envelope.is_some() && strict {
521            return Err(SyncularError::protocol_message(format!(
522                "Yjs envelope provided for table \"{table}\" without matching rules"
523            )));
524        }
525        if strip_envelope {
526            payload.remove(envelope_key);
527        }
528        return Ok(Value::Object(payload));
529    }
530
531    let Some(raw_envelope) = raw_envelope else {
532        if strip_envelope {
533            payload.remove(envelope_key);
534        }
535        return Ok(Value::Object(payload));
536    };
537    let Value::Object(envelope) = raw_envelope else {
538        return Err(SyncularError::protocol_message(format!(
539            "Yjs payload key \"{envelope_key}\" must be an object for table \"{table}\""
540        )));
541    };
542
543    for (field, raw_update_input) in envelope {
544        let Some(rule) = table_rules.get(&field) else {
545            if strict {
546                return Err(SyncularError::protocol_message(format!(
547                    "No Yjs rule found for envelope field \"{field}\" on table \"{table}\""
548                )));
549            }
550            continue;
551        };
552        let updates =
553            normalize_update_envelopes(raw_update_input, &format!("yjs.{table}.{field}"))?;
554        let base_state = existing_row
555            .and_then(|row| state_value_to_base64(row.get(&rule.state_column)))
556            .or_else(|| state_value_to_base64(payload.get(&rule.state_column)));
557        if base_state.is_none() {
558            if let Some(required) = updates
559                .iter()
560                .find_map(|update| required_state_vector(update))
561            {
562                return Err(SyncularError::protocol_message(format!(
563                    "Yjs diff envelope for table \"{table}\" row \"{}\" field \"{field}\" requires local base state vector {required}, but no local state is available; {FULL_SNAPSHOT_RESYNC_REQUIRED}",
564                    row_id.unwrap_or("<unknown>")
565                )));
566            }
567        }
568        let doc = create_doc_from_state(base_state.as_deref())?;
569        if base_state.is_none() {
570            seed_rule_value_from_rows(&doc, rule, &Map::new(), existing_row);
571        }
572        apply_updates(&doc, &updates)?;
573        let next_value = materialize_rule_value(&doc, rule)?;
574        let next_state = {
575            let txn = doc.transact();
576            encode_base64(&txn.encode_state_as_update_v1(&StateVector::default()))
577        };
578        payload.insert(rule.field.clone(), next_value);
579        payload.insert(rule.state_column.clone(), Value::String(next_state));
580    }
581
582    if strip_envelope {
583        payload.remove(envelope_key);
584    }
585    Ok(Value::Object(payload))
586}
587
588#[cfg(not(feature = "crdt-yjs"))]
589fn transform_payload(
590    table: &str,
591    _row_id: Option<&str>,
592    payload: Value,
593    _existing_row: Option<&Value>,
594    rules: &[YjsFieldRule],
595    envelope_key: &str,
596    strict: bool,
597    strip_envelope: bool,
598) -> Result<Value> {
599    let mut payload = match payload {
600        Value::Object(payload) => payload,
601        other => return Ok(other),
602    };
603    let table_rules = table_rule_index(table, rules)?;
604    let raw_envelope = payload.get(envelope_key).cloned();
605
606    if table_rules.is_empty() {
607        if raw_envelope.is_some() && strict {
608            return Err(SyncularError::protocol_message(format!(
609                "Yjs envelope provided for table \"{table}\" without matching rules"
610            )));
611        }
612        if strip_envelope {
613            payload.remove(envelope_key);
614        }
615        return Ok(Value::Object(payload));
616    }
617
618    if raw_envelope.is_some() {
619        return Err(crdt_yjs_feature_disabled());
620    }
621    if strip_envelope {
622        payload.remove(envelope_key);
623    }
624    Ok(Value::Object(payload))
625}
626
627#[cfg(feature = "crdt-yjs")]
628fn materialize_row(
629    table: &str,
630    _row_id: Option<&str>,
631    row: Value,
632    rules: &[YjsFieldRule],
633    envelope_key: &str,
634    strip_envelope: bool,
635) -> Result<Value> {
636    let mut row = match row {
637        Value::Object(row) => row,
638        other => return Ok(other),
639    };
640    for rule in table_rule_index(table, rules)?.values() {
641        let Some(state_base64) = state_value_to_base64(row.get(&rule.state_column)) else {
642            continue;
643        };
644        let doc = create_doc_from_state(Some(&state_base64))?;
645        row.insert(rule.field.clone(), materialize_rule_value(&doc, rule)?);
646    }
647    if strip_envelope {
648        row.remove(envelope_key);
649    }
650    Ok(Value::Object(row))
651}
652
653#[cfg(not(feature = "crdt-yjs"))]
654fn materialize_row(
655    table: &str,
656    _row_id: Option<&str>,
657    row: Value,
658    rules: &[YjsFieldRule],
659    envelope_key: &str,
660    strip_envelope: bool,
661) -> Result<Value> {
662    let mut row = match row {
663        Value::Object(row) => row,
664        other => return Ok(other),
665    };
666    let table_rules = table_rule_index(table, rules)?;
667    if !table_rules.is_empty() {
668        return Err(crdt_yjs_feature_disabled());
669    }
670    if strip_envelope {
671        row.remove(envelope_key);
672    }
673    Ok(Value::Object(row))
674}
675
676fn table_rule_index(table: &str, rules: &[YjsFieldRule]) -> Result<BTreeMap<String, YjsFieldRule>> {
677    let mut out = BTreeMap::new();
678    let mut seen = BTreeSet::new();
679    for rule in rules.iter().filter(|rule| rule.table == table) {
680        if rule.field.trim().is_empty() {
681            return Err(SyncularError::config(
682                "Yjs field rule field cannot be empty",
683            ));
684        }
685        if rule.state_column.trim().is_empty() {
686            return Err(SyncularError::config(
687                "Yjs field rule stateColumn cannot be empty",
688            ));
689        }
690        if !seen.insert(rule.field.clone()) {
691            return Err(SyncularError::config(format!(
692                "duplicate Yjs rule for table \"{table}\", field \"{}\"",
693                rule.field
694            )));
695        }
696        out.insert(
697            rule.field.clone(),
698            YjsFieldRule {
699                table: rule.table.clone(),
700                field: rule.field.clone(),
701                state_column: rule.state_column.clone(),
702                container_key: Some(
703                    rule.container_key
704                        .clone()
705                        .unwrap_or_else(|| rule.field.clone()),
706                ),
707                row_id_field: Some(
708                    rule.row_id_field
709                        .clone()
710                        .unwrap_or_else(|| "id".to_string()),
711                ),
712                kind: rule.kind,
713            },
714        );
715    }
716    Ok(out)
717}
718
719#[cfg(feature = "crdt-yjs")]
720fn normalize_update_envelopes(value: Value, context: &str) -> Result<Vec<YjsUpdateEnvelope>> {
721    match value {
722        Value::Array(values) => values
723            .into_iter()
724            .enumerate()
725            .map(|(index, value)| normalize_update_envelope(value, &format!("{context}[{index}]")))
726            .collect(),
727        value => Ok(vec![normalize_update_envelope(value, context)?]),
728    }
729}
730
731#[cfg(feature = "crdt-yjs")]
732fn normalize_update_envelope(value: Value, context: &str) -> Result<YjsUpdateEnvelope> {
733    let envelope: YjsUpdateEnvelope = serde_json::from_value(value).map_err(|err| {
734        SyncularError::protocol_message(format!("{context} must be a Yjs update envelope: {err}"))
735    })?;
736    if envelope.update_id.trim().is_empty() {
737        return Err(SyncularError::protocol_message(format!(
738            "{context}.updateId must be a non-empty string"
739        )));
740    }
741    if envelope.update_base64.trim().is_empty() {
742        return Err(SyncularError::protocol_message(format!(
743            "{context}.updateBase64 must be a non-empty base64 string"
744        )));
745    }
746    if envelope
747        .requires_state_vector_base64
748        .as_deref()
749        .is_some_and(|value| value.trim().is_empty())
750    {
751        return Err(SyncularError::protocol_message(format!(
752            "{context}.requiresStateVectorBase64 must be a non-empty base64 string when provided"
753        )));
754    }
755    validate_yjs_update_envelope_size(&envelope)?;
756    Ok(envelope)
757}
758
759#[cfg(feature = "crdt-yjs")]
760fn create_doc_from_state(state_base64: Option<&str>) -> Result<Doc> {
761    let doc = Doc::new();
762    if let Some(state_base64) = state_base64.filter(|value| !value.trim().is_empty()) {
763        validate_yjs_state_base64_size(state_base64)?;
764        let bytes = decode_base64(state_base64)?;
765        let update = Update::decode_v1(bytes.as_slice())
766            .map_err(|err| SyncularError::protocol_message(format!("decode Yjs state: {err}")))?;
767        doc.transact_mut()
768            .apply_update(update)
769            .map_err(|err| SyncularError::protocol_message(format!("apply Yjs state: {err}")))?;
770    }
771    Ok(doc)
772}
773
774#[cfg(feature = "crdt-yjs")]
775fn apply_updates(doc: &Doc, updates: &[YjsUpdateEnvelope]) -> Result<()> {
776    validate_yjs_update_envelope_list_size(updates)?;
777    let mut txn = doc.transact_mut();
778    for update in updates {
779        if let Some(required) = required_state_vector(update) {
780            let actual = encode_base64(&txn.state_vector().encode_v1());
781            if actual != required {
782                return Err(SyncularError::protocol_message(format!(
783                    "Yjs update {} requires base state vector {required}, but current state vector is {actual}; {FULL_SNAPSHOT_RESYNC_REQUIRED}",
784                    update.update_id
785                )));
786            }
787        }
788        let bytes = decode_base64(&update.update_base64)?;
789        let decoded = Update::decode_v1(bytes.as_slice()).map_err(|err| {
790            SyncularError::protocol_message(format!(
791                "decode Yjs update {}: {err}",
792                update.update_id
793            ))
794        })?;
795        txn.apply_update(decoded).map_err(|err| {
796            SyncularError::protocol_message(format!("apply Yjs update {}: {err}", update.update_id))
797        })?;
798    }
799    Ok(())
800}
801
802#[cfg(feature = "crdt-yjs")]
803fn required_state_vector(update: &YjsUpdateEnvelope) -> Option<&str> {
804    update
805        .requires_state_vector_base64
806        .as_deref()
807        .filter(|value| !value.trim().is_empty())
808}
809
810#[cfg(feature = "crdt-yjs")]
811fn patch_text(doc: &Doc, container_key: &str, next_text: &str) {
812    let text = doc.get_or_insert_text(container_key);
813    let current_text = {
814        let txn = doc.transact();
815        text.get_string(&txn)
816    };
817    if current_text == next_text {
818        return;
819    }
820
821    let prefix_len = common_prefix_boundary(&current_text, next_text);
822    let (current_suffix_start, next_suffix_start) =
823        common_suffix_boundaries(&current_text, next_text, prefix_len);
824
825    let delete_len = current_suffix_start - prefix_len;
826    let insert_segment = &next_text[prefix_len..next_suffix_start];
827    let mut txn = doc.transact_mut();
828    if delete_len > 0 {
829        text.remove_range(&mut txn, prefix_len as u32, delete_len as u32);
830    }
831    if !insert_segment.is_empty() {
832        text.insert(&mut txn, prefix_len as u32, insert_segment);
833    }
834}
835
836#[cfg(feature = "crdt-yjs")]
837fn common_prefix_boundary(left: &str, right: &str) -> usize {
838    let mut prefix = 0;
839    for ((left_index, left_char), (right_index, right_char)) in
840        left.char_indices().zip(right.char_indices())
841    {
842        if left_index != right_index || left_char != right_char {
843            break;
844        }
845        prefix = left_index + left_char.len_utf8();
846    }
847    prefix
848}
849
850#[cfg(feature = "crdt-yjs")]
851fn common_suffix_boundaries(left: &str, right: &str, prefix_len: usize) -> (usize, usize) {
852    let mut left_start = left.len();
853    let mut right_start = right.len();
854    let mut left_chars = left[prefix_len..].char_indices().rev();
855    let mut right_chars = right[prefix_len..].char_indices().rev();
856
857    while let (Some((left_index, left_char)), Some((right_index, right_char))) =
858        (left_chars.next(), right_chars.next())
859    {
860        if left_char != right_char {
861            break;
862        }
863        left_start = prefix_len + left_index;
864        right_start = prefix_len + right_index;
865    }
866
867    (left_start, right_start)
868}
869
870#[cfg(feature = "crdt-yjs")]
871fn seed_rule_value_from_rows(
872    doc: &Doc,
873    rule: &YjsFieldRule,
874    payload: &Map<String, Value>,
875    existing_row: Option<&Value>,
876) {
877    if rule.kind != YjsFieldKind::Text {
878        return;
879    }
880    let initial = payload
881        .get(&rule.field)
882        .and_then(Value::as_str)
883        .or_else(|| existing_row.and_then(|row| row.get(&rule.field).and_then(Value::as_str)));
884    if let Some(initial) = initial.filter(|value| !value.is_empty()) {
885        patch_text(
886            doc,
887            rule.container_key.as_deref().unwrap_or(&rule.field),
888            initial,
889        );
890    }
891}
892
893#[cfg(feature = "crdt-yjs")]
894fn materialize_rule_value(doc: &Doc, rule: &YjsFieldRule) -> Result<Value> {
895    let container_key = rule.container_key.as_deref().unwrap_or(&rule.field);
896    let text_ref;
897    let xml_ref;
898    match rule.kind {
899        YjsFieldKind::Text => {
900            text_ref = Some(doc.get_or_insert_text(container_key));
901            xml_ref = None;
902        }
903        YjsFieldKind::XmlFragment | YjsFieldKind::Prosemirror => {
904            text_ref = None;
905            xml_ref = Some(doc.get_or_insert_xml_fragment(container_key));
906        }
907    }
908    let txn = doc.transact();
909    let value = match rule.kind {
910        YjsFieldKind::Text => text_ref.expect("text ref is initialized").get_string(&txn),
911        YjsFieldKind::XmlFragment | YjsFieldKind::Prosemirror => {
912            xml_ref.expect("xml ref is initialized").get_string(&txn)
913        }
914    };
915    Ok(Value::String(value))
916}
917
918fn has_envelope(value: &Value, envelope_key: &str) -> bool {
919    value
920        .as_object()
921        .is_some_and(|object| object.contains_key(envelope_key))
922}
923
924fn with_operation_envelope(
925    local_row: Value,
926    operation_payload: Option<&Value>,
927    metadata: &AppTableMetadata,
928    envelope_key: &str,
929) -> Value {
930    if has_envelope(&local_row, envelope_key) {
931        return local_row;
932    }
933    let Some(envelope) = operation_payload.and_then(|payload| payload.get(envelope_key)) else {
934        return local_row;
935    };
936    let Value::Object(mut row) = local_row else {
937        return local_row;
938    };
939    row.insert(envelope_key.to_string(), envelope.clone());
940    strip_enveloped_materialized_fields(&mut row, metadata, envelope_key);
941    Value::Object(row)
942}
943
944fn merge_operation_payload_into_local_row(
945    local_row: Value,
946    operation_payload: &Value,
947    metadata: &AppTableMetadata,
948    envelope_key: &str,
949) -> Value {
950    match local_row {
951        Value::Object(mut row) => {
952            if let Value::Object(payload) = operation_payload {
953                for (key, value) in payload {
954                    row.insert(key.clone(), value.clone());
955                }
956            }
957            strip_enveloped_materialized_fields(&mut row, metadata, envelope_key);
958            Value::Object(row)
959        }
960        other => other,
961    }
962}
963
964fn strip_enveloped_materialized_fields(
965    row: &mut Map<String, Value>,
966    metadata: &AppTableMetadata,
967    envelope_key: &str,
968) {
969    let Some(envelope) = row.get(envelope_key).and_then(Value::as_object) else {
970        return;
971    };
972    let enveloped_fields = envelope.keys().cloned().collect::<Vec<_>>();
973    for field_name in enveloped_fields {
974        if let Some(field) = metadata
975            .crdt_yjs_fields
976            .iter()
977            .find(|candidate| candidate.field == field_name)
978        {
979            row.remove(field.field);
980            row.remove(field.state_column);
981        }
982    }
983}
984
985#[cfg(feature = "crdt-yjs")]
986fn state_value_to_base64(value: Option<&Value>) -> Option<String> {
987    match value? {
988        Value::String(value) if !value.is_empty() => Some(value.clone()),
989        _ => None,
990    }
991}
992
993#[cfg(feature = "crdt-yjs")]
994fn encode_base64(bytes: &[u8]) -> String {
995    BASE64.encode(bytes)
996}
997
998#[cfg(feature = "crdt-yjs")]
999fn decode_base64(value: &str) -> Result<Vec<u8>> {
1000    BASE64
1001        .decode(value)
1002        .map_err(|err| SyncularError::protocol_message(format!("invalid base64 string: {err}")))
1003}
1004
1005#[cfg(not(feature = "crdt-yjs"))]
1006fn crdt_yjs_feature_disabled() -> SyncularError {
1007    SyncularError::config("CRDT Yjs support is not enabled in this Syncular runtime build")
1008}
1009
1010#[cfg(all(test, feature = "crdt-yjs"))]
1011mod tests {
1012    use super::*;
1013    use serde_json::json;
1014
1015    #[test]
1016    fn text_updates_merge_concurrent_changes() -> Result<()> {
1017        let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1018            previous_state_base64: None,
1019            next_text: "middle".to_string(),
1020            container_key: None,
1021            update_id: Some("base".to_string()),
1022        })?;
1023        let prepend = build_yjs_text_update(BuildYjsTextUpdateArgs {
1024            previous_state_base64: Some(base.next_state_base64.clone()),
1025            next_text: "left middle".to_string(),
1026            container_key: None,
1027            update_id: Some("prepend".to_string()),
1028        })?;
1029        let append = build_yjs_text_update(BuildYjsTextUpdateArgs {
1030            previous_state_base64: Some(base.next_state_base64.clone()),
1031            next_text: "middle right".to_string(),
1032            container_key: None,
1033            update_id: Some("append".to_string()),
1034        })?;
1035
1036        let forward = apply_yjs_text_updates(ApplyYjsTextUpdatesArgs {
1037            previous_state_base64: Some(base.next_state_base64.clone()),
1038            updates: vec![prepend.update.clone(), append.update.clone()],
1039            container_key: None,
1040        })?;
1041        let reverse = apply_yjs_text_updates(ApplyYjsTextUpdatesArgs {
1042            previous_state_base64: Some(base.next_state_base64),
1043            updates: vec![append.update, prepend.update],
1044            container_key: None,
1045        })?;
1046
1047        assert_eq!(forward.text, reverse.text);
1048        assert!(forward.text.contains("left"));
1049        assert!(forward.text.contains("middle"));
1050        assert!(forward.text.contains("right"));
1051        Ok(())
1052    }
1053
1054    #[test]
1055    fn envelope_materializes_payload_and_strips_transport_key() -> Result<()> {
1056        let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1057            previous_state_base64: None,
1058            next_text: "hello".to_string(),
1059            container_key: Some("title".to_string()),
1060            update_id: Some("base".to_string()),
1061        })?;
1062        let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1063            previous_state_base64: Some(base.next_state_base64.clone()),
1064            next_text: "hello world".to_string(),
1065            container_key: Some("title".to_string()),
1066            update_id: Some("next".to_string()),
1067        })?;
1068        let result = apply_yjs_envelope_to_payload(ApplyYjsEnvelopeToPayloadArgs {
1069            table: "tasks".to_string(),
1070            row_id: Some("task-1".to_string()),
1071            payload: json!({ "__yjs": { "title": next.update } }),
1072            existing_row: Some(json!({
1073                "id": "task-1",
1074                "title": "hello",
1075                "title_yjs_state": base.next_state_base64
1076            })),
1077            rules: vec![YjsFieldRule {
1078                table: "tasks".to_string(),
1079                field: "title".to_string(),
1080                state_column: "title_yjs_state".to_string(),
1081                container_key: Some("title".to_string()),
1082                row_id_field: Some("id".to_string()),
1083                kind: YjsFieldKind::Text,
1084            }],
1085            envelope_key: None,
1086            strict: None,
1087            strip_envelope: None,
1088        })?;
1089
1090        assert_eq!(result.payload["title"], "hello world");
1091        assert!(result.payload["title_yjs_state"].as_str().is_some());
1092        assert!(result.payload.get(YJS_PAYLOAD_KEY).is_none());
1093        Ok(())
1094    }
1095
1096    #[test]
1097    fn initial_yjs_envelope_does_not_duplicate_plain_payload_text() -> Result<()> {
1098        let initial = build_yjs_text_update(BuildYjsTextUpdateArgs {
1099            previous_state_base64: None,
1100            next_text: "Draft".to_string(),
1101            container_key: Some("title".to_string()),
1102            update_id: Some("initial".to_string()),
1103        })?;
1104        let result = apply_yjs_envelope_to_payload(ApplyYjsEnvelopeToPayloadArgs {
1105            table: "tasks".to_string(),
1106            row_id: Some("task-1".to_string()),
1107            payload: json!({
1108                "title": "Draft",
1109                "__yjs": { "title": initial.update }
1110            }),
1111            existing_row: None,
1112            rules: vec![YjsFieldRule {
1113                table: "tasks".to_string(),
1114                field: "title".to_string(),
1115                state_column: "title_yjs_state".to_string(),
1116                container_key: Some("title".to_string()),
1117                row_id_field: Some("id".to_string()),
1118                kind: YjsFieldKind::Text,
1119            }],
1120            envelope_key: None,
1121            strict: None,
1122            strip_envelope: None,
1123        })?;
1124
1125        assert_eq!(result.payload["title"], "Draft");
1126        assert!(result.payload["title_yjs_state"].as_str().is_some());
1127        assert!(result.payload.get(YJS_PAYLOAD_KEY).is_none());
1128        Ok(())
1129    }
1130
1131    #[test]
1132    fn diff_envelope_remote_rows_preserve_non_crdt_payload_fields() -> Result<()> {
1133        static CRDT_FIELDS: &[CrdtYjsFieldMetadata] = &[CrdtYjsFieldMetadata {
1134            field: "title",
1135            state_column: "title_yjs_state",
1136            container_key: "title",
1137            row_id_field: "id",
1138            kind: "text",
1139            sync_mode: "server-merge",
1140        }];
1141        static TABLE: AppTableMetadata = AppTableMetadata {
1142            name: "tasks",
1143            primary_key_column: "id",
1144            server_version_column: "server_version",
1145            soft_delete_column: None,
1146            subscription_id: "tasks",
1147            columns: &[],
1148            blob_columns: &[],
1149            crdt_yjs_fields: CRDT_FIELDS,
1150            encrypted_fields: &[],
1151            scopes: &[],
1152        };
1153
1154        let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1155            previous_state_base64: None,
1156            next_text: "hello".to_string(),
1157            container_key: Some("title".to_string()),
1158            update_id: Some("base".to_string()),
1159        })?;
1160        let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1161            previous_state_base64: Some(base.next_state_base64.clone()),
1162            next_text: "hello world".to_string(),
1163            container_key: Some("title".to_string()),
1164            update_id: Some("next".to_string()),
1165        })?;
1166        let mut next_update = next.update;
1167        next_update.requires_state_vector_base64 =
1168            Some(yjs_state_vector_base64(Some(&base.next_state_base64))?);
1169
1170        let row = transform_local_row_for_metadata(
1171            "tasks",
1172            "task-1",
1173            None,
1174            Some(&json!({
1175                "updated_at": 2,
1176                "__yjs": { "title": next_update }
1177            })),
1178            Some(&json!({
1179                "id": "task-1",
1180                "title": "hello",
1181                "title_yjs_state": base.next_state_base64,
1182                "updated_at": 1
1183            })),
1184            &TABLE,
1185        )?
1186        .expect("diff envelope materializes existing row");
1187
1188        assert_eq!(row["title"], "hello world");
1189        assert_eq!(row["updated_at"], 2);
1190        assert!(row["title_yjs_state"].as_str().is_some());
1191        assert!(row.get(YJS_PAYLOAD_KEY).is_none());
1192        Ok(())
1193    }
1194
1195    #[test]
1196    fn diff_envelope_without_required_local_base_requests_resync() -> Result<()> {
1197        static CRDT_FIELDS: &[CrdtYjsFieldMetadata] = &[CrdtYjsFieldMetadata {
1198            field: "title",
1199            state_column: "title_yjs_state",
1200            container_key: "title",
1201            row_id_field: "id",
1202            kind: "text",
1203            sync_mode: "server-merge",
1204        }];
1205        static TABLE: AppTableMetadata = AppTableMetadata {
1206            name: "tasks",
1207            primary_key_column: "id",
1208            server_version_column: "server_version",
1209            soft_delete_column: None,
1210            subscription_id: "tasks",
1211            columns: &[],
1212            blob_columns: &[],
1213            crdt_yjs_fields: CRDT_FIELDS,
1214            encrypted_fields: &[],
1215            scopes: &[],
1216        };
1217
1218        let base = build_yjs_text_update(BuildYjsTextUpdateArgs {
1219            previous_state_base64: None,
1220            next_text: "hello".to_string(),
1221            container_key: Some("title".to_string()),
1222            update_id: Some("base".to_string()),
1223        })?;
1224        let next = build_yjs_text_update(BuildYjsTextUpdateArgs {
1225            previous_state_base64: Some(base.next_state_base64.clone()),
1226            next_text: "hello world".to_string(),
1227            container_key: Some("title".to_string()),
1228            update_id: Some("next".to_string()),
1229        })?;
1230        let mut next_update = next.update;
1231        next_update.requires_state_vector_base64 =
1232            Some(yjs_state_vector_base64(Some(&base.next_state_base64))?);
1233
1234        let err = transform_local_row_for_metadata(
1235            "tasks",
1236            "task-1",
1237            None,
1238            Some(&json!({
1239                "__yjs": { "title": next_update }
1240            })),
1241            None,
1242            &TABLE,
1243        )
1244        .expect_err("server diff without local base must request resync");
1245
1246        let message = err.to_string();
1247        assert!(message.contains("tasks"));
1248        assert!(message.contains("task-1"));
1249        assert!(message.contains("title"));
1250        assert!(message.contains("full snapshot resync required"));
1251        Ok(())
1252    }
1253}
1254
1255#[cfg(all(test, not(feature = "crdt-yjs")))]
1256mod tests_without_crdt_yjs {
1257    use super::*;
1258
1259    #[test]
1260    fn yjs_operations_report_disabled_feature() {
1261        let err = build_yjs_text_update(BuildYjsTextUpdateArgs {
1262            previous_state_base64: None,
1263            next_text: "hello".to_string(),
1264            container_key: None,
1265            update_id: None,
1266        })
1267        .expect_err("CRDT/Yjs operation should require crdt-yjs feature");
1268
1269        assert!(err.to_string().contains("CRDT Yjs support is not enabled"));
1270    }
1271}