Skip to main content

syncular_runtime/core/
protocol.rs

1use crate::error::{Result, SyncularError};
2use crate::limits::{
3    MAX_BLOB_PAYLOAD_BYTES, MAX_MUTATION_BATCH_JSON_BYTES, MAX_MUTATION_LOCAL_ROW_JSON_BYTES,
4    MAX_MUTATION_OPERATION_JSON_BYTES, MAX_OUTBOX_OPERATIONS_JSON_BYTES,
5    MAX_REALTIME_SYNC_PACK_BYTES, MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES,
6    MAX_SNAPSHOT_ARTIFACT_DECOMPRESSED_BYTES, MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES,
7    MAX_SNAPSHOT_CHUNK_DECOMPRESSED_BYTES, MAX_WEBSOCKET_TEXT_FRAME_BYTES,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::io::Read;
13pub use syncular_protocol::{
14    validate_scoped_snapshot_artifact_ref, AuthLeaseCapabilities, AuthLeaseIssueRequest,
15    AuthLeaseIssueResponse, AuthLeasePayload, AuthLeaseProtectedHeader, AuthLeaseProvenance,
16    AuthLeaseScope, AuthLeaseValidationResult, BlobDownloadUrlResponse, BlobRef,
17    BlobUploadCompleteResponse, BlobUploadInitRequest, BlobUploadInitResponse, BootstrapState,
18    CombinedRequest, CombinedResponse, CrdtStateVectorHint, OperationResult, PullRequest,
19    PullResponse, PushBatchRequest, PushBatchResponse, PushCommitRequest, PushCommitResponse,
20    RealtimePresenceEntry, RealtimePresenceEvent, RealtimePresenceRequest, RealtimePushRequest,
21    RealtimePushResponseData, RealtimeServerMessage, ScopeValues, ScopedSnapshotArtifactManifest,
22    ScopedSnapshotArtifactRef, SnapshotArtifactsRequest, SnapshotChunkRef, SnapshotManifest,
23    SnapshotManifestChunkRef, SubscriptionIntegrity, SubscriptionRequest, SubscriptionResponse,
24    SyncChange, SyncCommit, SyncOperation, SyncSnapshot, VerifiedCommitRoot, AUTH_LEASE_ALG_ES256,
25    AUTH_LEASE_CODE_BUSINESS_REJECTED, AUTH_LEASE_CODE_EXPIRED, AUTH_LEASE_CODE_INVALID,
26    AUTH_LEASE_CODE_MISSING, AUTH_LEASE_CODE_SCHEMA_MISMATCH, AUTH_LEASE_CODE_SCOPE_MISMATCH,
27    AUTH_LEASE_CODE_SCOPE_REVOKED, AUTH_LEASE_PROTOCOL_VERSION, AUTH_LEASE_TYP, AUTH_LEASE_VERSION,
28    BINARY_SYNC_PACK_WIRE_VERSION, COMMIT_INTEGRITY_GENESIS_ROOT, COMMIT_INTEGRITY_HEX_LENGTH,
29    REALTIME_CLIENT_MESSAGE_PRESENCE, REALTIME_CLIENT_MESSAGE_PUSH, REALTIME_SERVER_EVENT_PRESENCE,
30    REALTIME_SERVER_EVENT_PUSH_RESPONSE, REALTIME_SERVER_EVENT_SYNC,
31    SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1, SNAPSHOT_CHUNK_COMPRESSION_GZIP,
32    SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1, SNAPSHOT_MANIFEST_VERSION, SYNC_PACK_CONTENT_TYPE,
33    SYNC_PACK_ENCODING_BINARY_V1, WIRE_COMMIT_CHAIN_ROOT_VERSION, WIRE_COMMIT_DIGEST_VERSION,
34};
35use uuid::Uuid;
36
37pub fn validate_sqlite_snapshot_artifact_for_apply(
38    artifact: &ScopedSnapshotArtifactRef,
39    subscription_id: &str,
40    table: &str,
41) -> Result<()> {
42    validate_scoped_snapshot_artifact_ref(artifact)?;
43    validate_snapshot_artifact_ref_size(artifact)?;
44    if artifact.artifact_kind != SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
45        return Err(SyncularError::protocol_message(format!(
46            "unsupported snapshot artifact kind {}",
47            artifact.artifact_kind
48        )));
49    }
50    if artifact.compression != SNAPSHOT_CHUNK_COMPRESSION_GZIP {
51        return Err(SyncularError::protocol_message(format!(
52            "unsupported snapshot artifact compression {}",
53            artifact.compression
54        )));
55    }
56    if artifact.manifest.subscription_id != subscription_id {
57        return Err(SyncularError::protocol_message(format!(
58            "snapshot artifact subscription mismatch: expected {}, got {}",
59            subscription_id, artifact.manifest.subscription_id
60        )));
61    }
62    if artifact.manifest.table != table {
63        return Err(SyncularError::protocol_message(format!(
64            "snapshot artifact table mismatch: expected {}, got {}",
65            table, artifact.manifest.table
66        )));
67    }
68    Ok(())
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SyncularMutationKind {
73    Insert,
74    Update,
75    Upsert,
76    Delete,
77}
78
79#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80pub struct PendingSyncularMutation {
81    pub kind: SyncularMutationKind,
82    pub table: String,
83    pub row_id: String,
84    pub payload: Option<Value>,
85    pub base_version: Option<i64>,
86    pub local_row: Option<Value>,
87}
88
89impl PendingSyncularMutation {
90    pub fn operation(&self, base_version: Option<i64>) -> SyncOperation {
91        SyncOperation {
92            table: self.table.clone(),
93            row_id: self.row_id.clone(),
94            op: match self.kind {
95                SyncularMutationKind::Delete => "delete",
96                SyncularMutationKind::Insert
97                | SyncularMutationKind::Update
98                | SyncularMutationKind::Upsert => "upsert",
99            }
100            .to_string(),
101            payload: self.payload.clone(),
102            base_version,
103        }
104    }
105}
106
107pub trait IntoSyncularMutation {
108    fn into_syncular_mutation(self) -> PendingSyncularMutation;
109}
110
111impl IntoSyncularMutation for PendingSyncularMutation {
112    fn into_syncular_mutation(self) -> PendingSyncularMutation {
113        self
114    }
115}
116
117impl IntoSyncularMutation for SyncOperation {
118    fn into_syncular_mutation(self) -> PendingSyncularMutation {
119        PendingSyncularMutation {
120            kind: if self.op == "delete" {
121                SyncularMutationKind::Delete
122            } else {
123                SyncularMutationKind::Upsert
124            },
125            table: self.table,
126            row_id: self.row_id,
127            payload: self.payload,
128            base_version: self.base_version,
129            local_row: None,
130        }
131    }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135pub struct MutationReceipt {
136    pub commit_id: String,
137    pub client_commit_id: String,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
141pub struct MutationCommit<R> {
142    pub result: R,
143    pub commit: MutationReceipt,
144}
145
146#[derive(Debug, Default)]
147pub struct SyncularMutationBatch {
148    mutations: Vec<PendingSyncularMutation>,
149}
150
151impl SyncularMutationBatch {
152    pub fn new() -> Self {
153        Self {
154            mutations: Vec::new(),
155        }
156    }
157
158    pub fn push<M>(&mut self, mutation: M) -> String
159    where
160        M: IntoSyncularMutation,
161    {
162        let mutation = mutation.into_syncular_mutation();
163        let row_id = mutation.row_id.clone();
164        self.mutations.push(mutation);
165        row_id
166    }
167
168    pub fn is_empty(&self) -> bool {
169        self.mutations.is_empty()
170    }
171
172    pub fn mutations(&self) -> &[PendingSyncularMutation] {
173        &self.mutations
174    }
175
176    pub fn into_mutations(self) -> Vec<PendingSyncularMutation> {
177        self.mutations
178    }
179}
180
181pub fn validate_mutation_json_input_size(
182    operation_json: &str,
183    local_row_json: Option<&str>,
184) -> Result<()> {
185    validate_payload_bytes(
186        "maxMutationOperationJsonBytes",
187        operation_json.len(),
188        MAX_MUTATION_OPERATION_JSON_BYTES,
189        "Syncular mutation operation JSON exceeds the configured limit",
190    )?;
191    if let Some(local_row_json) = local_row_json {
192        validate_payload_bytes(
193            "maxMutationLocalRowJsonBytes",
194            local_row_json.len(),
195            MAX_MUTATION_LOCAL_ROW_JSON_BYTES,
196            "Syncular mutation local row JSON exceeds the configured limit",
197        )?;
198    }
199    Ok(())
200}
201
202pub fn validate_mutation_batch_json_input_size(operations_json: &str) -> Result<()> {
203    validate_payload_bytes(
204        "maxMutationBatchJsonBytes",
205        operations_json.len(),
206        MAX_MUTATION_BATCH_JSON_BYTES,
207        "Syncular mutation batch JSON exceeds the configured limit",
208    )
209}
210
211pub fn validate_pending_mutation_batch_size(mutations: &[PendingSyncularMutation]) -> Result<()> {
212    let bytes = serde_json::to_vec(mutations)?;
213    validate_payload_bytes(
214        "maxMutationBatchJsonBytes",
215        bytes.len(),
216        MAX_MUTATION_BATCH_JSON_BYTES,
217        "Syncular typed mutation batch exceeds the configured limit",
218    )
219}
220
221pub fn sync_operations_json_for_outbox(operations: &[SyncOperation]) -> Result<String> {
222    let operations_json = serde_json::to_string(operations)?;
223    validate_payload_bytes(
224        "maxOutboxOperationsJsonBytes",
225        operations_json.len(),
226        MAX_OUTBOX_OPERATIONS_JSON_BYTES,
227        "Syncular outbox operations JSON exceeds the configured limit",
228    )?;
229    Ok(operations_json)
230}
231
232pub fn validate_payload_bytes(
233    limit: &'static str,
234    observed: usize,
235    max: usize,
236    message: &'static str,
237) -> Result<()> {
238    if observed > max {
239        return Err(SyncularError::limit_exceeded(limit, observed, max, message));
240    }
241    Ok(())
242}
243
244pub fn random_syncular_id() -> String {
245    Uuid::new_v4().to_string()
246}
247
248pub fn validate_pull_commit_integrity_metadata(response: &PullResponse) -> Result<()> {
249    syncular_protocol::validate_pull_commit_integrity_metadata(response).map_err(Into::into)
250}
251
252pub fn verify_subscription_commit_integrity(
253    subscription_id: &str,
254    stored_root: Option<&str>,
255    integrity: Option<&SubscriptionIntegrity>,
256    commits: &[SyncCommit],
257) -> Result<Option<VerifiedCommitRoot>> {
258    syncular_protocol::verify_subscription_commit_integrity(
259        subscription_id,
260        stored_root,
261        integrity,
262        commits,
263    )
264    .map_err(Into::into)
265}
266
267pub fn validate_pull_snapshot_manifests(response: &PullResponse) -> Result<()> {
268    syncular_protocol::validate_pull_snapshot_manifests(response).map_err(SyncularError::from)?;
269    for subscription in &response.subscriptions {
270        if let Some(snapshots) = &subscription.snapshots {
271            for snapshot in snapshots {
272                if let Some(chunks) = &snapshot.chunks {
273                    for chunk in chunks {
274                        validate_snapshot_chunk_ref_size(chunk)?;
275                    }
276                }
277                if let Some(artifacts) = &snapshot.artifacts {
278                    for artifact in artifacts {
279                        validate_snapshot_artifact_ref_size(artifact)?;
280                    }
281                }
282            }
283        }
284    }
285    Ok(())
286}
287
288pub fn wire_commit_digest(
289    partition_id: &str,
290    subscription_id: &str,
291    commit: &SyncCommit,
292) -> Result<String> {
293    syncular_protocol::wire_commit_digest(partition_id, subscription_id, commit).map_err(Into::into)
294}
295
296pub fn wire_commit_chain_root(
297    partition_id: &str,
298    subscription_id: &str,
299    previous_chain_root: &str,
300    commit_seq: i64,
301    commit_digest: &str,
302) -> Result<String> {
303    syncular_protocol::wire_commit_chain_root(
304        partition_id,
305        subscription_id,
306        previous_chain_root,
307        commit_seq,
308        commit_digest,
309    )
310    .map_err(Into::into)
311}
312
313pub fn wire_commit_chain_root_from_digest(
314    partition_id: &str,
315    subscription_id: &str,
316    previous_chain_root: &str,
317    commit_seq: i64,
318    commit_digest: &str,
319) -> Result<String> {
320    syncular_protocol::wire_commit_chain_root_from_digest(
321        partition_id,
322        subscription_id,
323        previous_chain_root,
324        commit_seq,
325        commit_digest,
326    )
327    .map_err(Into::into)
328}
329
330pub fn snapshot_manifest_digest(manifest: &SnapshotManifest) -> Result<String> {
331    syncular_protocol::snapshot_manifest_digest(manifest).map_err(Into::into)
332}
333
334pub fn blob_hash(data: &[u8]) -> String {
335    syncular_protocol::blob_hash(data)
336}
337
338pub fn normalize_blob_mime_type(mime_type: &str) -> String {
339    syncular_protocol::normalize_blob_mime_type(mime_type)
340}
341
342pub fn blob_hash_reader(mut reader: impl Read) -> Result<(String, i64)> {
343    let mut hasher = Sha256::new();
344    let mut size = 0i64;
345    let mut buffer = [0u8; 64 * 1024];
346    loop {
347        let read = reader.read(&mut buffer)?;
348        if read == 0 {
349            break;
350        }
351        size = size
352            .checked_add(i64::try_from(read).map_err(|_| {
353                SyncularError::protocol_message("blob chunk is too large for size metadata")
354            })?)
355            .ok_or_else(|| SyncularError::protocol_message("blob is too large"))?;
356        validate_blob_size_bytes(size)?;
357        hasher.update(&buffer[..read]);
358    }
359    Ok((format!("sha256:{}", hex::encode(hasher.finalize())), size))
360}
361
362pub fn validate_blob_hash(hash: &str) -> Result<()> {
363    syncular_protocol::validate_blob_hash(hash).map_err(Into::into)
364}
365
366pub fn validate_blob_size_bytes(size: i64) -> Result<()> {
367    if size < 0 {
368        return Err(SyncularError::protocol_message(
369            "blob size cannot be negative",
370        ));
371    }
372    if size > MAX_BLOB_PAYLOAD_BYTES {
373        return Err(SyncularError::limit_exceeded(
374            "maxBlobPayloadBytes",
375            usize::try_from(size).unwrap_or(usize::MAX),
376            usize::try_from(MAX_BLOB_PAYLOAD_BYTES).unwrap_or(usize::MAX),
377            "Syncular blob payload exceeds the configured limit",
378        ));
379    }
380    Ok(())
381}
382
383pub fn validate_blob_ref_size(blob: &BlobRef) -> Result<()> {
384    validate_blob_size_bytes(blob.size)
385}
386
387pub fn validate_blob_bytes(blob: &BlobRef, data: &[u8]) -> Result<()> {
388    validate_payload_bytes(
389        "maxBlobPayloadBytes",
390        data.len(),
391        usize::try_from(MAX_BLOB_PAYLOAD_BYTES).unwrap_or(usize::MAX),
392        "Syncular blob payload exceeds the configured limit",
393    )?;
394    validate_blob_ref_size(blob)?;
395    syncular_protocol::validate_blob_bytes(blob, data).map_err(Into::into)
396}
397
398pub fn validate_blob_digest(blob: &BlobRef, actual_hash: &str, actual_size: i64) -> Result<()> {
399    validate_blob_ref_size(blob)?;
400    validate_blob_size_bytes(actual_size)?;
401    syncular_protocol::validate_blob_digest(blob, actual_hash, actual_size).map_err(Into::into)
402}
403
404pub fn validate_snapshot_chunk_ref_size(chunk: &SnapshotChunkRef) -> Result<()> {
405    validate_i64_payload_bytes(
406        "maxSnapshotChunkCompressedBytes",
407        chunk.byte_length,
408        MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES,
409        "Syncular snapshot chunk compressed payload exceeds the configured limit",
410    )
411}
412
413pub fn validate_snapshot_chunk_compressed_bytes(
414    chunk: &SnapshotChunkRef,
415    bytes: &[u8],
416) -> Result<()> {
417    validate_snapshot_chunk_ref_size(chunk)?;
418    validate_payload_bytes(
419        "maxSnapshotChunkCompressedBytes",
420        bytes.len(),
421        usize::try_from(MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES).unwrap_or(usize::MAX),
422        "Syncular snapshot chunk compressed payload exceeds the configured limit",
423    )?;
424    if bytes.len() as i64 != chunk.byte_length {
425        return Err(SyncularError::protocol_message(format!(
426            "snapshot chunk byte length mismatch: expected {}, got {}",
427            chunk.byte_length,
428            bytes.len()
429        )));
430    }
431    Ok(())
432}
433
434pub fn validate_snapshot_chunk_decompressed_bytes(bytes: &[u8]) -> Result<()> {
435    validate_payload_bytes(
436        "maxSnapshotChunkDecompressedBytes",
437        bytes.len(),
438        MAX_SNAPSHOT_CHUNK_DECOMPRESSED_BYTES,
439        "Syncular snapshot chunk decompressed payload exceeds the configured limit",
440    )
441}
442
443pub fn validate_snapshot_artifact_ref_size(artifact: &ScopedSnapshotArtifactRef) -> Result<()> {
444    validate_i64_payload_bytes(
445        "maxSnapshotArtifactCompressedBytes",
446        artifact.byte_length,
447        MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES,
448        "Syncular snapshot artifact compressed payload exceeds the configured limit",
449    )
450}
451
452pub fn validate_snapshot_artifact_compressed_bytes(
453    artifact: &ScopedSnapshotArtifactRef,
454    bytes: &[u8],
455) -> Result<()> {
456    validate_snapshot_artifact_ref_size(artifact)?;
457    validate_payload_bytes(
458        "maxSnapshotArtifactCompressedBytes",
459        bytes.len(),
460        usize::try_from(MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES).unwrap_or(usize::MAX),
461        "Syncular snapshot artifact compressed payload exceeds the configured limit",
462    )?;
463    if bytes.len() as i64 != artifact.byte_length {
464        return Err(SyncularError::protocol_message(format!(
465            "snapshot artifact byte length mismatch: expected {}, got {}",
466            artifact.byte_length,
467            bytes.len()
468        )));
469    }
470    Ok(())
471}
472
473pub fn validate_snapshot_artifact_decompressed_bytes(bytes: &[u8]) -> Result<()> {
474    validate_payload_bytes(
475        "maxSnapshotArtifactDecompressedBytes",
476        bytes.len(),
477        MAX_SNAPSHOT_ARTIFACT_DECOMPRESSED_BYTES,
478        "Syncular snapshot artifact decompressed payload exceeds the configured limit",
479    )
480}
481
482pub fn validate_realtime_sync_pack_bytes(bytes: &[u8]) -> Result<()> {
483    validate_payload_bytes(
484        "maxRealtimeSyncPackBytes",
485        bytes.len(),
486        MAX_REALTIME_SYNC_PACK_BYTES,
487        "Syncular realtime sync-pack payload exceeds the configured limit",
488    )
489}
490
491pub fn validate_websocket_text_frame_size(text: &str) -> Result<()> {
492    validate_payload_bytes(
493        "maxWebsocketTextFrameBytes",
494        text.len(),
495        MAX_WEBSOCKET_TEXT_FRAME_BYTES,
496        "Syncular websocket text frame exceeds the configured limit",
497    )
498}
499
500fn validate_i64_payload_bytes(
501    limit: &'static str,
502    observed: i64,
503    max: i64,
504    message: &'static str,
505) -> Result<()> {
506    if observed < 0 {
507        return Err(SyncularError::protocol_message(format!(
508            "{limit} observed byte length cannot be negative"
509        )));
510    }
511    if observed > max {
512        return Err(SyncularError::limit_exceeded(
513            limit,
514            usize::try_from(observed).unwrap_or(usize::MAX),
515            usize::try_from(max).unwrap_or(usize::MAX),
516            message,
517        ));
518    }
519    Ok(())
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn oversized_blob_ref_returns_stable_limit_error() {
528        let blob = BlobRef {
529            hash: format!("sha256:{}", "0".repeat(64)),
530            size: MAX_BLOB_PAYLOAD_BYTES + 1,
531            mime_type: "application/octet-stream".to_string(),
532            encrypted: false,
533            key_id: None,
534        };
535
536        let err = validate_blob_digest(&blob, &blob.hash, blob.size).unwrap_err();
537        let classification = err.classification();
538        assert_eq!(classification.code, "runtime.limit_exceeded");
539        assert_eq!(classification.category, "limit-exceeded");
540        assert_eq!(classification.recommended_action, "reduceInput");
541        assert!(err.message_text().contains("maxBlobPayloadBytes"));
542    }
543
544    #[test]
545    fn oversized_snapshot_refs_return_stable_limit_errors() {
546        let chunk = SnapshotChunkRef {
547            id: "chunk-1".to_string(),
548            byte_length: MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES + 1,
549            sha256: "0".repeat(64),
550            encoding: SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1.to_string(),
551            compression: SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string(),
552        };
553        let chunk_err = validate_snapshot_chunk_ref_size(&chunk).unwrap_err();
554        assert_eq!(chunk_err.classification().code, "runtime.limit_exceeded");
555        assert!(chunk_err
556            .message_text()
557            .contains("maxSnapshotChunkCompressedBytes"));
558
559        let mut manifest = ScopedSnapshotArtifactManifest {
560            version: syncular_protocol::SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION,
561            artifact_kind: SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string(),
562            digest: "artifact-digest".to_string(),
563            partition_id: "partition-1".to_string(),
564            subscription_id: "sub-1".to_string(),
565            table: "tasks".to_string(),
566            schema_version: "1".to_string(),
567            as_of_commit_seq: 1,
568            scope_digest: "scope-digest".to_string(),
569            row_cursor: None,
570            byte_length: MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES + 1,
571            sha256: "0".repeat(64),
572            compression: SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string(),
573            row_count: 1,
574            row_limit: 1,
575            next_row_cursor: None,
576            is_first_page: true,
577            is_last_page: true,
578            feature_set: Vec::new(),
579        };
580        manifest.digest =
581            syncular_protocol::scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
582        let artifact = ScopedSnapshotArtifactRef {
583            id: "artifact-1".to_string(),
584            manifest_digest: manifest.digest.clone(),
585            byte_length: manifest.byte_length,
586            sha256: manifest.sha256.clone(),
587            artifact_kind: manifest.artifact_kind.clone(),
588            compression: manifest.compression.clone(),
589            row_count: manifest.row_count,
590            next_row_cursor: manifest.next_row_cursor.clone(),
591            is_first_page: manifest.is_first_page,
592            is_last_page: manifest.is_last_page,
593            manifest,
594        };
595        let artifact_err = validate_snapshot_artifact_ref_size(&artifact).unwrap_err();
596        assert_eq!(artifact_err.classification().code, "runtime.limit_exceeded");
597        assert!(artifact_err
598            .message_text()
599            .contains("maxSnapshotArtifactCompressedBytes"));
600    }
601
602    #[test]
603    fn oversized_realtime_payloads_return_stable_limit_errors() {
604        let sync_pack = vec![0u8; MAX_REALTIME_SYNC_PACK_BYTES + 1];
605        let sync_pack_err = validate_realtime_sync_pack_bytes(&sync_pack).unwrap_err();
606        assert_eq!(
607            sync_pack_err.classification().code,
608            "runtime.limit_exceeded"
609        );
610        assert!(sync_pack_err
611            .message_text()
612            .contains("maxRealtimeSyncPackBytes"));
613
614        let frame = "x".repeat(MAX_WEBSOCKET_TEXT_FRAME_BYTES + 1);
615        let frame_err = validate_websocket_text_frame_size(&frame).unwrap_err();
616        assert_eq!(frame_err.classification().code, "runtime.limit_exceeded");
617        assert!(frame_err
618            .message_text()
619            .contains("maxWebsocketTextFrameBytes"));
620    }
621}