Skip to main content

syncular_protocol/
integrity.rs

1use crate::ProtocolError;
2use crate::{
3    append_canonical_json, append_canonical_object, append_json_string, sha256_hex, PullResponse,
4    Result, SubscriptionIntegrity, SyncCommit, COMMIT_INTEGRITY_GENESIS_ROOT,
5    COMMIT_INTEGRITY_HEX_LENGTH, WIRE_COMMIT_CHAIN_ROOT_VERSION, WIRE_COMMIT_DIGEST_VERSION,
6};
7use std::fmt::Write as _;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct VerifiedCommitRoot {
11    pub partition_id: String,
12    pub commit_seq: i64,
13    pub root: String,
14}
15
16pub fn validate_pull_commit_integrity_metadata(response: &PullResponse) -> Result<()> {
17    for subscription in &response.subscriptions {
18        let Some(integrity) = &subscription.integrity else {
19            continue;
20        };
21        if subscription.commits.is_empty() {
22            return Err(ProtocolError::message(format!(
23                "subscription {} has integrity metadata without commits",
24                subscription.id
25            )));
26        }
27        validate_commit_integrity_hex(
28            "previousChainRoot",
29            &subscription.id,
30            integrity.commit_seq,
31            &integrity.previous_chain_root,
32        )?;
33        validate_commit_integrity_hex(
34            "commitChainRoot",
35            &subscription.id,
36            integrity.commit_seq,
37            &integrity.commit_chain_root,
38        )?;
39        let Some(last_commit) = subscription.commits.last() else {
40            continue;
41        };
42        if last_commit.commit_seq != integrity.commit_seq {
43            return Err(ProtocolError::message(format!(
44                "subscription {} integrity commitSeq mismatch: expected {}, got {}",
45                subscription.id, last_commit.commit_seq, integrity.commit_seq
46            )));
47        }
48    }
49    Ok(())
50}
51
52pub fn verify_subscription_commit_integrity(
53    subscription_id: &str,
54    stored_root: Option<&str>,
55    integrity: Option<&SubscriptionIntegrity>,
56    commits: &[SyncCommit],
57) -> Result<Option<VerifiedCommitRoot>> {
58    let Some(integrity) = integrity else {
59        return Ok(None);
60    };
61    let mut expected_previous_root = stored_root
62        .filter(|root| !root.is_empty())
63        .unwrap_or(COMMIT_INTEGRITY_GENESIS_ROOT)
64        .to_string();
65
66    if integrity.previous_chain_root != expected_previous_root {
67        return Err(ProtocolError::message(format!(
68            "subscription {subscription_id} previousChainRoot mismatch: expected {}, got {}",
69            expected_previous_root, integrity.previous_chain_root
70        )));
71    }
72
73    for commit in commits {
74        let actual_digest = wire_commit_digest(&integrity.partition_id, subscription_id, commit)?;
75        expected_previous_root = wire_commit_chain_root_from_digest(
76            &integrity.partition_id,
77            subscription_id,
78            &expected_previous_root,
79            commit.commit_seq,
80            &actual_digest,
81        )?;
82    }
83
84    if expected_previous_root != integrity.commit_chain_root {
85        return Err(ProtocolError::message(format!(
86            "subscription {subscription_id} commitChainRoot mismatch: expected {}, got {}",
87            integrity.commit_chain_root, expected_previous_root
88        )));
89    }
90
91    Ok(Some(VerifiedCommitRoot {
92        partition_id: integrity.partition_id.clone(),
93        commit_seq: integrity.commit_seq,
94        root: integrity.commit_chain_root.clone(),
95    }))
96}
97
98pub fn wire_commit_digest(
99    partition_id: &str,
100    subscription_id: &str,
101    commit: &SyncCommit,
102) -> Result<String> {
103    let mut payload = String::new();
104    append_wire_commit_digest_payload(&mut payload, partition_id, subscription_id, commit)?;
105    Ok(sha256_hex(&payload))
106}
107
108pub fn wire_commit_chain_root(
109    partition_id: &str,
110    subscription_id: &str,
111    previous_chain_root: &str,
112    commit_seq: i64,
113    commit_digest: &str,
114) -> Result<String> {
115    wire_commit_chain_root_from_digest(
116        partition_id,
117        subscription_id,
118        previous_chain_root,
119        commit_seq,
120        commit_digest,
121    )
122}
123
124pub fn wire_commit_chain_root_from_digest(
125    partition_id: &str,
126    subscription_id: &str,
127    previous_chain_root: &str,
128    commit_seq: i64,
129    commit_digest: &str,
130) -> Result<String> {
131    let mut payload = String::new();
132    append_wire_commit_chain_root_payload(
133        &mut payload,
134        partition_id,
135        subscription_id,
136        previous_chain_root,
137        commit_seq,
138        commit_digest,
139    )?;
140    Ok(sha256_hex(&payload))
141}
142
143fn append_wire_commit_digest_payload(
144    out: &mut String,
145    partition_id: &str,
146    subscription_id: &str,
147    commit: &SyncCommit,
148) -> Result<()> {
149    out.push_str("{\"actorId\":");
150    append_json_string(out, &commit.actor_id)?;
151    out.push_str(",\"changes\":[");
152    for (index, change) in commit.changes.iter().enumerate() {
153        if index > 0 {
154            out.push(',');
155        }
156        out.push_str("{\"op\":");
157        append_json_string(out, &change.op)?;
158        out.push_str(",\"row\":");
159        match &change.row_json {
160            Some(row) => append_canonical_json(out, row)?,
161            None => out.push_str("null"),
162        }
163        out.push_str(",\"rowId\":");
164        append_json_string(out, &change.row_id)?;
165        out.push_str(",\"rowVersion\":");
166        match change.row_version {
167            Some(row_version) => {
168                write!(out, "{row_version}").expect("writing to String should not fail")
169            }
170            None => out.push_str("null"),
171        }
172        out.push_str(",\"scopes\":");
173        append_canonical_object(out, &change.scopes)?;
174        out.push_str(",\"table\":");
175        append_json_string(out, &change.table)?;
176        out.push('}');
177    }
178    out.push_str("],\"commitSeq\":");
179    write!(out, "{}", commit.commit_seq).expect("writing to String should not fail");
180    out.push_str(",\"createdAt\":");
181    append_json_string(out, &commit.created_at)?;
182    out.push_str(",\"partitionId\":");
183    append_json_string(out, partition_id)?;
184    out.push_str(",\"subscriptionId\":");
185    append_json_string(out, subscription_id)?;
186    out.push_str(",\"version\":");
187    append_json_string(out, WIRE_COMMIT_DIGEST_VERSION)?;
188    out.push('}');
189    Ok(())
190}
191
192fn append_wire_commit_chain_root_payload(
193    out: &mut String,
194    partition_id: &str,
195    subscription_id: &str,
196    previous_chain_root: &str,
197    commit_seq: i64,
198    commit_digest: &str,
199) -> Result<()> {
200    out.push_str("{\"commitDigest\":");
201    append_json_string(out, commit_digest)?;
202    out.push_str(",\"commitSeq\":");
203    write!(out, "{commit_seq}").expect("writing to String should not fail");
204    out.push_str(",\"partitionId\":");
205    append_json_string(out, partition_id)?;
206    out.push_str(",\"previousChainRoot\":");
207    append_json_string(out, previous_chain_root)?;
208    out.push_str(",\"subscriptionId\":");
209    append_json_string(out, subscription_id)?;
210    out.push_str(",\"version\":");
211    append_json_string(out, WIRE_COMMIT_CHAIN_ROOT_VERSION)?;
212    out.push('}');
213    Ok(())
214}
215
216pub(crate) fn validate_commit_integrity_hex(
217    label: &str,
218    subscription_id: &str,
219    commit_seq: i64,
220    value: &str,
221) -> Result<()> {
222    if value.len() != COMMIT_INTEGRITY_HEX_LENGTH
223        || !value
224            .bytes()
225            .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
226    {
227        return Err(ProtocolError::message(format!(
228            "subscription {subscription_id} commit {commit_seq} {label} must be a lowercase 64-character SHA-256 hex string"
229        )));
230    }
231    Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use crate::{ScopeValues, SyncChange};
238    use serde_json::json;
239
240    #[test]
241    fn verifies_subscription_commit_root() {
242        let change = SyncChange {
243            table: "tasks".to_string(),
244            row_id: "task-1".to_string(),
245            op: "upsert".to_string(),
246            row_json: Some(json!({"id":"task-1","title":"Ship"})),
247            row_version: Some(1),
248            scopes: ScopeValues::new(),
249        };
250        let commit = SyncCommit {
251            commit_seq: 7,
252            created_at: "2026-05-19T00:00:00.000Z".to_string(),
253            actor_id: "server".to_string(),
254            changes: vec![change],
255        };
256        let digest = wire_commit_digest("default", "sub-tasks", &commit).expect("digest");
257        let root = wire_commit_chain_root(
258            "default",
259            "sub-tasks",
260            COMMIT_INTEGRITY_GENESIS_ROOT,
261            7,
262            &digest,
263        )
264        .expect("root");
265        let verified = verify_subscription_commit_integrity(
266            "sub-tasks",
267            None,
268            Some(&SubscriptionIntegrity {
269                partition_id: "default".to_string(),
270                previous_chain_root: COMMIT_INTEGRITY_GENESIS_ROOT.to_string(),
271                commit_chain_root: root.clone(),
272                commit_seq: 7,
273            }),
274            &[commit],
275        )
276        .expect("valid root")
277        .expect("verified root");
278
279        assert_eq!(verified.root, root);
280        assert_eq!(verified.commit_seq, 7);
281    }
282}