syncular-runtime 0.1.0

Shared Rust runtime for Syncular SQLite-backed native and browser clients.
Documentation
use crate::app_schema::{AppTableMetadata, CrdtYjsFieldMetadata};
use crate::crdt_yjs::YjsUpdateEnvelope;
use crate::encryption::{
    FieldEncryptionContext, FieldEncryptionKeyProvider, StaticFieldEncryptionKeys,
};
use crate::error::{Result, SyncularError};
use crate::protocol::{PendingSyncularMutation, PullResponse, SyncChange};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::BTreeMap;
use std::sync::Arc;

pub const CRDT_UPDATES_TABLE: &str = "sync_crdt_updates";
pub const CRDT_CHECKPOINTS_TABLE: &str = "sync_crdt_checkpoints";

pub fn is_encrypted_crdt_system_table(table: &str) -> bool {
    matches!(table, CRDT_UPDATES_TABLE | CRDT_CHECKPOINTS_TABLE)
}

pub fn encrypted_crdt_identity_column(table: &str) -> Result<&'static str> {
    match table {
        CRDT_UPDATES_TABLE => Ok("update_id"),
        CRDT_CHECKPOINTS_TABLE => Ok("checkpoint_id"),
        _ => Err(SyncularError::config(format!(
            "unknown encrypted CRDT system table: {table}"
        ))),
    }
}

pub fn encrypted_crdt_normalize_row(
    table: &str,
    row_id: &str,
    row: Option<&Value>,
) -> Result<Map<String, Value>> {
    let mut obj = match row {
        Some(Value::Object(obj)) => obj.clone(),
        Some(other) => {
            return Err(SyncularError::protocol_message(format!(
                "encrypted CRDT row for {table} is not an object: {other}"
            )));
        }
        None => Map::new(),
    };
    let identity_column = encrypted_crdt_identity_column(table)?;
    obj.entry(identity_column.to_string())
        .or_insert_with(|| Value::String(row_id.to_string()));
    required_string(&obj, "stream_id")?;
    required_string(&obj, "app_table")?;
    required_string(&obj, "row_id")?;
    required_string(&obj, "field_name")?;
    required_string(&obj, identity_column)?;
    required_string(&obj, "key_id")?;
    required_string(&obj, "ciphertext")?;
    if table == CRDT_CHECKPOINTS_TABLE {
        let covers_seq = obj
            .get("covers_seq")
            .and_then(Value::as_i64)
            .ok_or_else(|| {
                SyncularError::protocol_message(
                    "encrypted CRDT checkpoint covers_seq must be an integer",
                )
            })?;
        if covers_seq < 0 {
            return Err(SyncularError::protocol_message(
                "encrypted CRDT checkpoint covers_seq must be non-negative",
            ));
        }
    }
    Ok(obj)
}

pub fn encrypted_crdt_scopes_json(row: &Map<String, Value>) -> Result<String> {
    let scopes = row
        .get("scopes")
        .cloned()
        .unwrap_or_else(|| Value::Object(Map::new()));
    if !scopes.is_object() {
        return Err(SyncularError::protocol_message(
            "encrypted CRDT scopes must be an object",
        ));
    }
    Ok(serde_json::to_string(&scopes)?)
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StaticEncryptedCrdtConfig {
    pub keys: BTreeMap<String, String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub encryption_kid: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub partition_id: Option<String>,
}

#[derive(Clone)]
pub struct EncryptedCrdt {
    partition_id: String,
}

#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct EncryptedCrdtStreamStats {
    pub update_count: i64,
    pub checkpoint_count: i64,
    pub checkpointable_update_count: i64,
    pub max_server_seq: Option<i64>,
    pub latest_checkpoint_covers_seq: Option<i64>,
}

#[derive(Debug, Clone)]
pub struct BuildEncryptedCrdtTextUpdateArgs<'a> {
    pub ctx: FieldEncryptionContext,
    pub metadata: &'static AppTableMetadata,
    pub field: &'a str,
    pub row_id: &'a str,
    pub existing_row: &'a Value,
    pub next_text: &'a str,
}

#[derive(Debug, Clone)]
pub struct BuildEncryptedCrdtYjsUpdateArgs<'a> {
    pub ctx: FieldEncryptionContext,
    pub metadata: &'static AppTableMetadata,
    pub field: &'a str,
    pub row_id: &'a str,
    pub existing_row: &'a Value,
    pub update: YjsUpdateEnvelope,
}

#[derive(Debug, Clone)]
pub struct BuildEncryptedCrdtCheckpointArgs<'a> {
    pub ctx: FieldEncryptionContext,
    pub metadata: &'static AppTableMetadata,
    pub field: &'a str,
    pub row_id: &'a str,
    pub existing_row: &'a Value,
    pub covers_seq: i64,
}

impl EncryptedCrdt {
    pub fn new(_keys: Arc<dyn FieldEncryptionKeyProvider>) -> Self {
        Self {
            partition_id: "default".to_string(),
        }
    }

    pub fn with_partition_id(
        _keys: Arc<dyn FieldEncryptionKeyProvider>,
        partition_id: impl Into<String>,
    ) -> Result<Self> {
        let partition_id = partition_id.into();
        if partition_id.trim().is_empty() {
            return Err(SyncularError::config(
                "encrypted CRDT partition_id cannot be empty",
            ));
        }
        Ok(Self { partition_id })
    }

    pub fn from_static_config(config: StaticEncryptedCrdtConfig) -> Result<Self> {
        let _ = StaticFieldEncryptionKeys::from_key_material(config.keys, config.encryption_kid)?;
        Err(e2ee_feature_disabled())
    }

    pub fn from_static_config_json(config_json: &str) -> Result<Option<Self>> {
        let trimmed = config_json.trim();
        if trimmed.is_empty() || trimmed == "null" {
            return Ok(None);
        }
        Err(e2ee_feature_disabled())
    }

    pub fn partition_id(&self) -> &str {
        &self.partition_id
    }

    pub fn transform_pull_response(&self, _response: PullResponse) -> Result<PullResponse> {
        Err(e2ee_feature_disabled())
    }

    pub fn transform_change(&self, _change: SyncChange) -> Result<SyncChange> {
        Err(e2ee_feature_disabled())
    }

    pub fn build_text_update_mutation(
        &self,
        _args: BuildEncryptedCrdtTextUpdateArgs<'_>,
    ) -> Result<PendingSyncularMutation> {
        Err(e2ee_feature_disabled())
    }

    pub fn build_yjs_update_mutation(
        &self,
        _args: BuildEncryptedCrdtYjsUpdateArgs<'_>,
    ) -> Result<PendingSyncularMutation> {
        Err(e2ee_feature_disabled())
    }

    pub fn build_checkpoint_mutation(
        &self,
        _args: BuildEncryptedCrdtCheckpointArgs<'_>,
    ) -> Result<PendingSyncularMutation> {
        Err(e2ee_feature_disabled())
    }
}

pub fn encrypted_crdt_stream_id(table: &str, row_id: &str, field: &str) -> String {
    format!(
        "{}:{}:{}",
        escape_stream_part(table),
        escape_stream_part(row_id),
        escape_stream_part(field)
    )
}

pub fn is_encrypted_update_log_field(field: &CrdtYjsFieldMetadata) -> bool {
    field.sync_mode == "encrypted-update-log"
}

pub fn encrypted_field_metadata(
    metadata: &'static AppTableMetadata,
    field_name: &str,
) -> Result<&'static CrdtYjsFieldMetadata> {
    metadata
        .crdt_yjs_fields
        .iter()
        .find(|field| field.field == field_name && is_encrypted_update_log_field(field))
        .ok_or_else(|| {
            SyncularError::config(format!(
                "no encrypted CRDT Yjs field metadata for {}.{field_name}",
                metadata.name
            ))
        })
}

pub fn encrypted_crdt_plaintext_update_base64(row: &Map<String, Value>) -> Option<String> {
    row.get("update_base64")
        .and_then(Value::as_str)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}

pub fn encrypted_crdt_plaintext_state_base64(row: &Map<String, Value>) -> Option<String> {
    row.get("state_base64")
        .and_then(Value::as_str)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}

pub fn apply_encrypted_crdt_plaintext_to_row(
    _metadata: &'static AppTableMetadata,
    _field_name: &str,
    _app_row_id: &str,
    _system_table: &str,
    _system_row: &Map<String, Value>,
    _current_row: Option<Value>,
) -> Result<Option<Value>> {
    Err(e2ee_feature_disabled())
}

pub fn encrypted_crdt_row_matches_scopes(
    row: &Map<String, Value>,
    scopes: &Map<String, Value>,
) -> bool {
    if scopes.is_empty() {
        return true;
    }
    let Some(stored_scopes) = row.get("scopes").and_then(Value::as_object) else {
        return false;
    };
    scopes.iter().all(|(key, requested)| {
        let Some(stored) = stored_scopes.get(key) else {
            return false;
        };
        if let Value::Array(values) = requested {
            return values.iter().any(|value| value == stored);
        }
        stored == requested
    })
}

fn required_string(row: &Map<String, Value>, field: &str) -> Result<String> {
    row.get(field)
        .and_then(Value::as_str)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
        .ok_or_else(|| {
            SyncularError::protocol_message(format!(
                "encrypted CRDT payload field {field} must be a non-empty string"
            ))
        })
}

fn escape_stream_part(value: &str) -> String {
    let mut out = String::new();
    for byte in value.bytes() {
        if byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_' | b'.' | b'~') {
            out.push(byte as char);
        } else {
            out.push_str(&format!("%{byte:02X}"));
        }
    }
    out
}

fn e2ee_feature_disabled() -> SyncularError {
    SyncularError::config("E2EE support is not enabled in this Syncular runtime build")
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn non_null_config_reports_disabled_feature() {
        let err = match EncryptedCrdt::from_static_config_json("{}") {
            Ok(_) => panic!("non-null encrypted CRDT config should require e2ee feature"),
            Err(err) => err,
        };
        assert!(err.to_string().contains("E2EE support is not enabled"));
    }
}