use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use sha2::{Digest, Sha256};
use std::fmt::Write as _;
pub mod auth_lease;
pub mod binary_snapshot;
pub mod binary_sync_pack;
pub mod blob;
pub mod error;
pub mod integrity;
pub mod realtime;
pub mod snapshot_artifact;
pub mod snapshot_chunk;
pub mod snapshot_manifest;
pub mod validation;
pub use auth_lease::{
AuthLeaseCapabilities, AuthLeaseIssueRequest, AuthLeaseIssueResponse, AuthLeasePayload,
AuthLeaseProtectedHeader, AuthLeaseProvenance, AuthLeaseScope, AuthLeaseValidationResult,
AUTH_LEASE_ALG_ES256, AUTH_LEASE_CODE_BUSINESS_REJECTED, AUTH_LEASE_CODE_EXPIRED,
AUTH_LEASE_CODE_INVALID, AUTH_LEASE_CODE_MISSING, AUTH_LEASE_CODE_SCHEMA_MISMATCH,
AUTH_LEASE_CODE_SCOPE_MISMATCH, AUTH_LEASE_CODE_SCOPE_REVOKED, AUTH_LEASE_PROTOCOL_VERSION,
AUTH_LEASE_TYP, AUTH_LEASE_VERSION,
};
pub use blob::{
blob_hash, normalize_blob_mime_type, validate_blob_bytes, validate_blob_digest,
validate_blob_hash, validate_blob_ref, BlobDownloadUrlResponse, BlobRef,
BlobUploadCompleteResponse, BlobUploadInitRequest, BlobUploadInitResponse,
};
pub use error::{ProtocolError, Result};
pub use integrity::{
validate_pull_commit_integrity_metadata, verify_subscription_commit_integrity,
wire_commit_chain_root, wire_commit_chain_root_from_digest, wire_commit_digest,
VerifiedCommitRoot,
};
pub use realtime::{
realtime_presence_event_from_value, realtime_push_response_from_value, RealtimePresenceEntry,
RealtimePresenceEvent, RealtimePresenceRequest, RealtimePushRequest, RealtimePushResponseData,
RealtimeServerMessage, REALTIME_CLIENT_MESSAGE_PRESENCE, REALTIME_CLIENT_MESSAGE_PUSH,
REALTIME_SERVER_EVENT_PRESENCE, REALTIME_SERVER_EVENT_PUSH_RESPONSE,
REALTIME_SERVER_EVENT_SYNC,
};
pub use snapshot_artifact::{
scoped_snapshot_artifact_manifest_digest, validate_scoped_snapshot_artifact_manifest,
validate_scoped_snapshot_artifact_ref, ScopedSnapshotArtifactManifest,
ScopedSnapshotArtifactRef, SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1,
SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION, SNAPSHOT_ARTIFACT_COMPRESSION_NONE,
};
pub use snapshot_chunk::{
decode_snapshot_chunk_sha256, validate_snapshot_chunk_format,
validate_snapshot_chunk_hash_bytes, validate_snapshot_chunk_hash_hex,
SNAPSHOT_CHUNK_COMPRESSION_GZIP,
};
pub use snapshot_manifest::{snapshot_manifest_digest, validate_pull_snapshot_manifests};
pub use validation::{
validate_combined_request, validate_combined_response, validate_realtime_presence_request,
validate_realtime_push_request, validate_realtime_server_message,
};
pub const COMMIT_INTEGRITY_HEX_LENGTH: usize = 64;
pub const COMMIT_INTEGRITY_GENESIS_ROOT: &str =
"0000000000000000000000000000000000000000000000000000000000000000";
pub const WIRE_COMMIT_DIGEST_VERSION: &str = "syncular-wire-commit-digest-v1";
pub const WIRE_COMMIT_CHAIN_ROOT_VERSION: &str = "syncular-wire-commit-chain-root-v1";
pub const SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1: &str = "binary-table-v1";
pub const SYNC_PACK_ENCODING_BINARY_V1: &str = "binary-sync-pack-v1";
pub const SYNC_PACK_CONTENT_TYPE: &str = "application/vnd.syncular.sync-pack.v1";
pub const BINARY_SYNC_PACK_WIRE_VERSION: u16 = 14;
pub const SNAPSHOT_MANIFEST_VERSION: i32 = 1;
pub type ScopeValues = Map<String, Value>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncOperation {
pub table: String,
pub row_id: String,
pub op: String,
pub payload: Option<Value>,
pub base_version: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushCommitRequest {
#[serde(rename = "clientCommitId")]
pub client_commit_id: String,
pub operations: Vec<SyncOperation>,
#[serde(rename = "schemaVersion")]
pub schema_version: i32,
#[serde(rename = "authLease", default, skip_serializing_if = "Option::is_none")]
pub auth_lease: Option<AuthLeaseProvenance>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushBatchRequest {
pub commits: Vec<PushCommitRequest>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BootstrapState {
#[serde(rename = "asOfCommitSeq")]
pub as_of_commit_seq: i64,
pub tables: Vec<String>,
#[serde(rename = "tableIndex")]
pub table_index: i64,
#[serde(rename = "rowCursor")]
pub row_cursor: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CrdtStateVectorHint {
#[serde(rename = "rowId")]
pub row_id: String,
pub field: String,
#[serde(rename = "stateColumn")]
pub state_column: String,
#[serde(rename = "stateVectorBase64")]
pub state_vector_base64: String,
#[serde(rename = "syncMode")]
pub sync_mode: String,
#[serde(rename = "updatedAt")]
pub updated_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionRequest {
pub id: String,
pub table: String,
pub scopes: ScopeValues,
#[serde(default, skip_serializing_if = "Map::is_empty")]
pub params: Map<String, Value>,
pub cursor: i64,
#[serde(rename = "bootstrapState", skip_serializing_if = "Option::is_none")]
pub bootstrap_state: Option<BootstrapState>,
#[serde(rename = "verifiedRoot", skip_serializing_if = "Option::is_none")]
pub verified_root: Option<String>,
#[serde(rename = "crdtStateVectors")]
pub crdt_state_vectors: Vec<CrdtStateVectorHint>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotArtifactsRequest {
#[serde(rename = "artifactKinds")]
pub artifact_kinds: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub compressions: Vec<String>,
#[serde(rename = "featureSet", default, skip_serializing_if = "Vec::is_empty")]
pub feature_set: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PullRequest {
#[serde(rename = "schemaVersion")]
pub schema_version: i32,
#[serde(rename = "limitCommits")]
pub limit_commits: i64,
#[serde(rename = "limitSnapshotRows")]
pub limit_snapshot_rows: i64,
#[serde(rename = "maxSnapshotPages")]
pub max_snapshot_pages: i64,
#[serde(rename = "dedupeRows", skip_serializing_if = "Option::is_none")]
pub dedupe_rows: Option<bool>,
#[serde(rename = "snapshotArtifacts", skip_serializing_if = "Option::is_none")]
pub snapshot_artifacts: Option<SnapshotArtifactsRequest>,
pub subscriptions: Vec<SubscriptionRequest>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CombinedRequest {
#[serde(rename = "clientId")]
pub client_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub push: Option<PushBatchRequest>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pull: Option<PullRequest>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CombinedResponse {
pub ok: bool,
#[serde(
rename = "requiredSchemaVersion",
skip_serializing_if = "Option::is_none"
)]
pub required_schema_version: Option<i32>,
#[serde(
rename = "latestSchemaVersion",
skip_serializing_if = "Option::is_none"
)]
pub latest_schema_version: Option<i32>,
pub push: Option<PushBatchResponse>,
pub pull: Option<PullResponse>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushBatchResponse {
pub ok: bool,
pub commits: Vec<PushCommitResponse>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PushCommitResponse {
#[serde(rename = "clientCommitId")]
pub client_commit_id: String,
pub status: String,
#[serde(rename = "commitSeq")]
pub commit_seq: Option<i64>,
pub results: Vec<OperationResult>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OperationResult {
#[serde(rename = "opIndex")]
pub op_index: i32,
pub status: String,
pub message: Option<String>,
pub error: Option<String>,
pub code: Option<String>,
pub retriable: Option<bool>,
#[serde(rename = "server_version")]
pub server_version: Option<i64>,
#[serde(rename = "server_row")]
pub server_row: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PullResponse {
pub ok: bool,
pub subscriptions: Vec<SubscriptionResponse>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionResponse {
pub id: String,
pub status: String,
pub scopes: ScopeValues,
pub bootstrap: bool,
#[serde(rename = "bootstrapState")]
pub bootstrap_state: Option<BootstrapState>,
#[serde(rename = "nextCursor")]
pub next_cursor: i64,
pub integrity: Option<SubscriptionIntegrity>,
pub commits: Vec<SyncCommit>,
pub snapshots: Option<Vec<SyncSnapshot>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SubscriptionIntegrity {
#[serde(rename = "partitionId")]
pub partition_id: String,
#[serde(rename = "previousChainRoot")]
pub previous_chain_root: String,
#[serde(rename = "commitChainRoot")]
pub commit_chain_root: String,
#[serde(rename = "commitSeq")]
pub commit_seq: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SyncCommit {
#[serde(rename = "commitSeq")]
pub commit_seq: i64,
#[serde(rename = "createdAt")]
pub created_at: String,
#[serde(rename = "actorId")]
pub actor_id: String,
pub changes: Vec<SyncChange>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncChange {
pub table: String,
pub row_id: String,
pub op: String,
pub row_json: Option<Value>,
pub row_version: Option<i64>,
pub scopes: ScopeValues,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncSnapshot {
pub table: String,
pub rows: Vec<Value>,
pub chunks: Option<Vec<SnapshotChunkRef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub artifacts: Option<Vec<ScopedSnapshotArtifactRef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub manifest: Option<SnapshotManifest>,
#[serde(rename = "isFirstPage")]
pub is_first_page: bool,
#[serde(rename = "isLastPage")]
pub is_last_page: bool,
#[serde(rename = "bootstrapStateAfter")]
pub bootstrap_state_after: Option<BootstrapState>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotChunkRef {
pub id: String,
#[serde(rename = "byteLength")]
pub byte_length: i64,
pub sha256: String,
pub encoding: String,
pub compression: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotManifest {
pub version: i32,
pub digest: String,
pub table: String,
#[serde(rename = "asOfCommitSeq")]
pub as_of_commit_seq: i64,
#[serde(rename = "scopeDigest")]
pub scope_digest: String,
#[serde(rename = "rowCursor")]
pub row_cursor: Option<String>,
#[serde(rename = "rowLimit")]
pub row_limit: i64,
#[serde(rename = "nextRowCursor")]
pub next_row_cursor: Option<String>,
#[serde(rename = "isFirstPage")]
pub is_first_page: bool,
#[serde(rename = "isLastPage")]
pub is_last_page: bool,
pub chunks: Vec<SnapshotManifestChunkRef>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SnapshotManifestChunkRef {
pub id: String,
#[serde(rename = "byteLength")]
pub byte_length: i64,
pub sha256: String,
pub encoding: String,
pub compression: String,
}
pub fn sha256_hex(value: &str) -> String {
hex::encode(Sha256::digest(value.as_bytes()))
}
pub fn canonical_json_string(value: &Value) -> Result<String> {
let mut out = String::new();
append_canonical_json(&mut out, value)?;
Ok(out)
}
pub fn append_canonical_json(out: &mut String, value: &Value) -> Result<()> {
match value {
Value::Null => out.push_str("null"),
Value::Bool(value) => out.push_str(if *value { "true" } else { "false" }),
Value::Number(value) => write!(out, "{value}").expect("writing to String should not fail"),
Value::String(value) => append_json_string(out, value)?,
Value::Array(values) => {
out.push('[');
for (index, item) in values.iter().enumerate() {
if index > 0 {
out.push(',');
}
append_canonical_json(out, item)?;
}
out.push(']');
}
Value::Object(values) => {
append_canonical_object(out, values)?;
}
}
Ok(())
}
pub fn append_canonical_object(out: &mut String, values: &Map<String, Value>) -> Result<()> {
out.push('{');
let body_start = out.len();
let mut previous: Option<&str> = None;
for (index, (key, value)) in values.iter().enumerate() {
if let Some(previous) = previous {
if previous > key.as_str() {
out.truncate(body_start);
append_canonical_object_sorted_body(out, values)?;
out.push('}');
return Ok(());
}
}
if index > 0 {
out.push(',');
}
append_json_string(out, key)?;
out.push(':');
append_canonical_json(out, value)?;
previous = Some(key.as_str());
}
out.push('}');
Ok(())
}
fn append_canonical_object_sorted_body(
out: &mut String,
values: &Map<String, Value>,
) -> Result<()> {
let mut keys = values.keys().collect::<Vec<_>>();
keys.sort();
for (index, key) in keys.into_iter().enumerate() {
if index > 0 {
out.push(',');
}
append_json_string(out, key)?;
out.push(':');
append_canonical_json(
out,
values
.get(key)
.expect("serde_json object key should resolve"),
)?;
}
Ok(())
}
pub(crate) fn append_json_string(out: &mut String, value: &str) -> Result<()> {
const HEX: &[u8; 16] = b"0123456789abcdef";
out.push('"');
let mut chunk_start = 0;
for (index, byte) in value.bytes().enumerate() {
let escaped = match byte {
b'"' => Some("\\\""),
b'\\' => Some("\\\\"),
b'\n' => Some("\\n"),
b'\r' => Some("\\r"),
b'\t' => Some("\\t"),
0x08 => Some("\\b"),
0x0c => Some("\\f"),
0x00..=0x1f => {
out.push_str(&value[chunk_start..index]);
out.push_str("\\u00");
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
chunk_start = index + 1;
continue;
}
_ => None,
};
if let Some(escaped) = escaped {
out.push_str(&value[chunk_start..index]);
out.push_str(escaped);
chunk_start = index + 1;
}
}
out.push_str(&value[chunk_start..]);
out.push('"');
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn append_json_string_matches_serde_json_string_escaping() {
let samples = [
"",
"plain",
"quote\"slash\\",
"line\nreturn\rtab\tbackspace\u{0008}form\u{000c}",
"\u{0000}\u{0001}\u{001f}",
"München 日本語",
];
for sample in samples {
let mut actual = String::new();
append_json_string(&mut actual, sample).expect("append string");
assert_eq!(actual, serde_json::to_string(sample).expect("serde string"));
}
}
#[test]
fn canonical_json_string_escapes_object_keys_and_string_values() {
let value = json!({
"line\nkey": "quote\"slash\\",
"nested": {
"control": "\u{0001}",
"unicode": "München 日本語"
}
});
let canonical = canonical_json_string(&value).expect("canonical json");
assert_eq!(
canonical,
"{\"line\\nkey\":\"quote\\\"slash\\\\\",\"nested\":{\"control\":\"\\u0001\",\"unicode\":\"München 日本語\"}}"
);
}
}