1use crate::ProtocolError;
2use crate::{
3 append_canonical_json, append_canonical_object, append_json_string, sha256_hex, PullResponse,
4 Result, SubscriptionIntegrity, SyncCommit, COMMIT_INTEGRITY_GENESIS_ROOT,
5 COMMIT_INTEGRITY_HEX_LENGTH, WIRE_COMMIT_CHAIN_ROOT_VERSION, WIRE_COMMIT_DIGEST_VERSION,
6};
7use std::fmt::Write as _;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct VerifiedCommitRoot {
11 pub partition_id: String,
12 pub commit_seq: i64,
13 pub root: String,
14}
15
16pub fn validate_pull_commit_integrity_metadata(response: &PullResponse) -> Result<()> {
17 for subscription in &response.subscriptions {
18 let Some(integrity) = &subscription.integrity else {
19 continue;
20 };
21 if subscription.commits.is_empty() {
22 return Err(ProtocolError::message(format!(
23 "subscription {} has integrity metadata without commits",
24 subscription.id
25 )));
26 }
27 validate_commit_integrity_hex(
28 "previousChainRoot",
29 &subscription.id,
30 integrity.commit_seq,
31 &integrity.previous_chain_root,
32 )?;
33 validate_commit_integrity_hex(
34 "commitChainRoot",
35 &subscription.id,
36 integrity.commit_seq,
37 &integrity.commit_chain_root,
38 )?;
39 let Some(last_commit) = subscription.commits.last() else {
40 continue;
41 };
42 if last_commit.commit_seq != integrity.commit_seq {
43 return Err(ProtocolError::message(format!(
44 "subscription {} integrity commitSeq mismatch: expected {}, got {}",
45 subscription.id, last_commit.commit_seq, integrity.commit_seq
46 )));
47 }
48 }
49 Ok(())
50}
51
52pub fn verify_subscription_commit_integrity(
53 subscription_id: &str,
54 stored_root: Option<&str>,
55 integrity: Option<&SubscriptionIntegrity>,
56 commits: &[SyncCommit],
57) -> Result<Option<VerifiedCommitRoot>> {
58 let Some(integrity) = integrity else {
59 return Ok(None);
60 };
61 let mut expected_previous_root = stored_root
62 .filter(|root| !root.is_empty())
63 .unwrap_or(COMMIT_INTEGRITY_GENESIS_ROOT)
64 .to_string();
65
66 if integrity.previous_chain_root != expected_previous_root {
67 return Err(ProtocolError::message(format!(
68 "subscription {subscription_id} previousChainRoot mismatch: expected {}, got {}",
69 expected_previous_root, integrity.previous_chain_root
70 )));
71 }
72
73 for commit in commits {
74 let actual_digest = wire_commit_digest(&integrity.partition_id, subscription_id, commit)?;
75 expected_previous_root = wire_commit_chain_root_from_digest(
76 &integrity.partition_id,
77 subscription_id,
78 &expected_previous_root,
79 commit.commit_seq,
80 &actual_digest,
81 )?;
82 }
83
84 if expected_previous_root != integrity.commit_chain_root {
85 return Err(ProtocolError::message(format!(
86 "subscription {subscription_id} commitChainRoot mismatch: expected {}, got {}",
87 integrity.commit_chain_root, expected_previous_root
88 )));
89 }
90
91 Ok(Some(VerifiedCommitRoot {
92 partition_id: integrity.partition_id.clone(),
93 commit_seq: integrity.commit_seq,
94 root: integrity.commit_chain_root.clone(),
95 }))
96}
97
98pub fn wire_commit_digest(
99 partition_id: &str,
100 subscription_id: &str,
101 commit: &SyncCommit,
102) -> Result<String> {
103 let mut payload = String::new();
104 append_wire_commit_digest_payload(&mut payload, partition_id, subscription_id, commit)?;
105 Ok(sha256_hex(&payload))
106}
107
108pub fn wire_commit_chain_root(
109 partition_id: &str,
110 subscription_id: &str,
111 previous_chain_root: &str,
112 commit_seq: i64,
113 commit_digest: &str,
114) -> Result<String> {
115 wire_commit_chain_root_from_digest(
116 partition_id,
117 subscription_id,
118 previous_chain_root,
119 commit_seq,
120 commit_digest,
121 )
122}
123
124pub fn wire_commit_chain_root_from_digest(
125 partition_id: &str,
126 subscription_id: &str,
127 previous_chain_root: &str,
128 commit_seq: i64,
129 commit_digest: &str,
130) -> Result<String> {
131 let mut payload = String::new();
132 append_wire_commit_chain_root_payload(
133 &mut payload,
134 partition_id,
135 subscription_id,
136 previous_chain_root,
137 commit_seq,
138 commit_digest,
139 )?;
140 Ok(sha256_hex(&payload))
141}
142
143fn append_wire_commit_digest_payload(
144 out: &mut String,
145 partition_id: &str,
146 subscription_id: &str,
147 commit: &SyncCommit,
148) -> Result<()> {
149 out.push_str("{\"actorId\":");
150 append_json_string(out, &commit.actor_id)?;
151 out.push_str(",\"changes\":[");
152 for (index, change) in commit.changes.iter().enumerate() {
153 if index > 0 {
154 out.push(',');
155 }
156 out.push_str("{\"op\":");
157 append_json_string(out, &change.op)?;
158 out.push_str(",\"row\":");
159 match &change.row_json {
160 Some(row) => append_canonical_json(out, row)?,
161 None => out.push_str("null"),
162 }
163 out.push_str(",\"rowId\":");
164 append_json_string(out, &change.row_id)?;
165 out.push_str(",\"rowVersion\":");
166 match change.row_version {
167 Some(row_version) => {
168 write!(out, "{row_version}").expect("writing to String should not fail")
169 }
170 None => out.push_str("null"),
171 }
172 out.push_str(",\"scopes\":");
173 append_canonical_object(out, &change.scopes)?;
174 out.push_str(",\"table\":");
175 append_json_string(out, &change.table)?;
176 out.push('}');
177 }
178 out.push_str("],\"commitSeq\":");
179 write!(out, "{}", commit.commit_seq).expect("writing to String should not fail");
180 out.push_str(",\"createdAt\":");
181 append_json_string(out, &commit.created_at)?;
182 out.push_str(",\"partitionId\":");
183 append_json_string(out, partition_id)?;
184 out.push_str(",\"subscriptionId\":");
185 append_json_string(out, subscription_id)?;
186 out.push_str(",\"version\":");
187 append_json_string(out, WIRE_COMMIT_DIGEST_VERSION)?;
188 out.push('}');
189 Ok(())
190}
191
192fn append_wire_commit_chain_root_payload(
193 out: &mut String,
194 partition_id: &str,
195 subscription_id: &str,
196 previous_chain_root: &str,
197 commit_seq: i64,
198 commit_digest: &str,
199) -> Result<()> {
200 out.push_str("{\"commitDigest\":");
201 append_json_string(out, commit_digest)?;
202 out.push_str(",\"commitSeq\":");
203 write!(out, "{commit_seq}").expect("writing to String should not fail");
204 out.push_str(",\"partitionId\":");
205 append_json_string(out, partition_id)?;
206 out.push_str(",\"previousChainRoot\":");
207 append_json_string(out, previous_chain_root)?;
208 out.push_str(",\"subscriptionId\":");
209 append_json_string(out, subscription_id)?;
210 out.push_str(",\"version\":");
211 append_json_string(out, WIRE_COMMIT_CHAIN_ROOT_VERSION)?;
212 out.push('}');
213 Ok(())
214}
215
216pub(crate) fn validate_commit_integrity_hex(
217 label: &str,
218 subscription_id: &str,
219 commit_seq: i64,
220 value: &str,
221) -> Result<()> {
222 if value.len() != COMMIT_INTEGRITY_HEX_LENGTH
223 || !value
224 .bytes()
225 .all(|byte| byte.is_ascii_digit() || (b'a'..=b'f').contains(&byte))
226 {
227 return Err(ProtocolError::message(format!(
228 "subscription {subscription_id} commit {commit_seq} {label} must be a lowercase 64-character SHA-256 hex string"
229 )));
230 }
231 Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use crate::{ScopeValues, SyncChange};
238 use serde_json::json;
239
240 #[test]
241 fn verifies_subscription_commit_root() {
242 let change = SyncChange {
243 table: "tasks".to_string(),
244 row_id: "task-1".to_string(),
245 op: "upsert".to_string(),
246 row_json: Some(json!({"id":"task-1","title":"Ship"})),
247 row_version: Some(1),
248 scopes: ScopeValues::new(),
249 };
250 let commit = SyncCommit {
251 commit_seq: 7,
252 created_at: "2026-05-19T00:00:00.000Z".to_string(),
253 actor_id: "server".to_string(),
254 changes: vec![change],
255 };
256 let digest = wire_commit_digest("default", "sub-tasks", &commit).expect("digest");
257 let root = wire_commit_chain_root(
258 "default",
259 "sub-tasks",
260 COMMIT_INTEGRITY_GENESIS_ROOT,
261 7,
262 &digest,
263 )
264 .expect("root");
265 let verified = verify_subscription_commit_integrity(
266 "sub-tasks",
267 None,
268 Some(&SubscriptionIntegrity {
269 partition_id: "default".to_string(),
270 previous_chain_root: COMMIT_INTEGRITY_GENESIS_ROOT.to_string(),
271 commit_chain_root: root.clone(),
272 commit_seq: 7,
273 }),
274 &[commit],
275 )
276 .expect("valid root")
277 .expect("verified root");
278
279 assert_eq!(verified.root, root);
280 assert_eq!(verified.commit_seq, 7);
281 }
282}