1use crate::error::{Result, SyncularError};
2use crate::limits::{
3 MAX_BLOB_PAYLOAD_BYTES, MAX_MUTATION_BATCH_JSON_BYTES, MAX_MUTATION_LOCAL_ROW_JSON_BYTES,
4 MAX_MUTATION_OPERATION_JSON_BYTES, MAX_OUTBOX_OPERATIONS_JSON_BYTES,
5 MAX_REALTIME_SYNC_PACK_BYTES, MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES,
6 MAX_SNAPSHOT_ARTIFACT_DECOMPRESSED_BYTES, MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES,
7 MAX_SNAPSHOT_CHUNK_DECOMPRESSED_BYTES, MAX_WEBSOCKET_TEXT_FRAME_BYTES,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use sha2::{Digest, Sha256};
12use std::io::Read;
13pub use syncular_protocol::{
14 validate_scoped_snapshot_artifact_ref, AuthLeaseCapabilities, AuthLeaseIssueRequest,
15 AuthLeaseIssueResponse, AuthLeasePayload, AuthLeaseProtectedHeader, AuthLeaseProvenance,
16 AuthLeaseScope, AuthLeaseValidationResult, BlobDownloadUrlResponse, BlobRef,
17 BlobUploadCompleteResponse, BlobUploadInitRequest, BlobUploadInitResponse, BootstrapState,
18 CombinedRequest, CombinedResponse, CrdtStateVectorHint, OperationResult, PullRequest,
19 PullResponse, PushBatchRequest, PushBatchResponse, PushCommitRequest, PushCommitResponse,
20 RealtimePresenceEntry, RealtimePresenceEvent, RealtimePresenceRequest, RealtimePushRequest,
21 RealtimePushResponseData, RealtimeServerMessage, ScopeValues, ScopedSnapshotArtifactManifest,
22 ScopedSnapshotArtifactRef, SnapshotArtifactsRequest, SnapshotChunkRef, SnapshotManifest,
23 SnapshotManifestChunkRef, SubscriptionIntegrity, SubscriptionRequest, SubscriptionResponse,
24 SyncChange, SyncCommit, SyncOperation, SyncSnapshot, VerifiedCommitRoot, AUTH_LEASE_ALG_ES256,
25 AUTH_LEASE_CODE_BUSINESS_REJECTED, AUTH_LEASE_CODE_EXPIRED, AUTH_LEASE_CODE_INVALID,
26 AUTH_LEASE_CODE_MISSING, AUTH_LEASE_CODE_SCHEMA_MISMATCH, AUTH_LEASE_CODE_SCOPE_MISMATCH,
27 AUTH_LEASE_CODE_SCOPE_REVOKED, AUTH_LEASE_PROTOCOL_VERSION, AUTH_LEASE_TYP, AUTH_LEASE_VERSION,
28 BINARY_SYNC_PACK_WIRE_VERSION, COMMIT_INTEGRITY_GENESIS_ROOT, COMMIT_INTEGRITY_HEX_LENGTH,
29 REALTIME_CLIENT_MESSAGE_PRESENCE, REALTIME_CLIENT_MESSAGE_PUSH, REALTIME_SERVER_EVENT_PRESENCE,
30 REALTIME_SERVER_EVENT_PUSH_RESPONSE, REALTIME_SERVER_EVENT_SYNC,
31 SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1, SNAPSHOT_CHUNK_COMPRESSION_GZIP,
32 SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1, SNAPSHOT_MANIFEST_VERSION, SYNC_PACK_CONTENT_TYPE,
33 SYNC_PACK_ENCODING_BINARY_V1, WIRE_COMMIT_CHAIN_ROOT_VERSION, WIRE_COMMIT_DIGEST_VERSION,
34};
35use uuid::Uuid;
36
37pub fn validate_sqlite_snapshot_artifact_for_apply(
38 artifact: &ScopedSnapshotArtifactRef,
39 subscription_id: &str,
40 table: &str,
41) -> Result<()> {
42 validate_scoped_snapshot_artifact_ref(artifact)?;
43 validate_snapshot_artifact_ref_size(artifact)?;
44 if artifact.artifact_kind != SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1 {
45 return Err(SyncularError::protocol_message(format!(
46 "unsupported snapshot artifact kind {}",
47 artifact.artifact_kind
48 )));
49 }
50 if artifact.compression != SNAPSHOT_CHUNK_COMPRESSION_GZIP {
51 return Err(SyncularError::protocol_message(format!(
52 "unsupported snapshot artifact compression {}",
53 artifact.compression
54 )));
55 }
56 if artifact.manifest.subscription_id != subscription_id {
57 return Err(SyncularError::protocol_message(format!(
58 "snapshot artifact subscription mismatch: expected {}, got {}",
59 subscription_id, artifact.manifest.subscription_id
60 )));
61 }
62 if artifact.manifest.table != table {
63 return Err(SyncularError::protocol_message(format!(
64 "snapshot artifact table mismatch: expected {}, got {}",
65 table, artifact.manifest.table
66 )));
67 }
68 Ok(())
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum SyncularMutationKind {
73 Insert,
74 Update,
75 Upsert,
76 Delete,
77}
78
79#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80pub struct PendingSyncularMutation {
81 pub kind: SyncularMutationKind,
82 pub table: String,
83 pub row_id: String,
84 pub payload: Option<Value>,
85 pub base_version: Option<i64>,
86 pub local_row: Option<Value>,
87}
88
89impl PendingSyncularMutation {
90 pub fn operation(&self, base_version: Option<i64>) -> SyncOperation {
91 SyncOperation {
92 table: self.table.clone(),
93 row_id: self.row_id.clone(),
94 op: match self.kind {
95 SyncularMutationKind::Delete => "delete",
96 SyncularMutationKind::Insert
97 | SyncularMutationKind::Update
98 | SyncularMutationKind::Upsert => "upsert",
99 }
100 .to_string(),
101 payload: self.payload.clone(),
102 base_version,
103 }
104 }
105}
106
107pub trait IntoSyncularMutation {
108 fn into_syncular_mutation(self) -> PendingSyncularMutation;
109}
110
111impl IntoSyncularMutation for PendingSyncularMutation {
112 fn into_syncular_mutation(self) -> PendingSyncularMutation {
113 self
114 }
115}
116
117impl IntoSyncularMutation for SyncOperation {
118 fn into_syncular_mutation(self) -> PendingSyncularMutation {
119 PendingSyncularMutation {
120 kind: if self.op == "delete" {
121 SyncularMutationKind::Delete
122 } else {
123 SyncularMutationKind::Upsert
124 },
125 table: self.table,
126 row_id: self.row_id,
127 payload: self.payload,
128 base_version: self.base_version,
129 local_row: None,
130 }
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135pub struct MutationReceipt {
136 pub commit_id: String,
137 pub client_commit_id: String,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
141pub struct MutationCommit<R> {
142 pub result: R,
143 pub commit: MutationReceipt,
144}
145
146#[derive(Debug, Default)]
147pub struct SyncularMutationBatch {
148 mutations: Vec<PendingSyncularMutation>,
149}
150
151impl SyncularMutationBatch {
152 pub fn new() -> Self {
153 Self {
154 mutations: Vec::new(),
155 }
156 }
157
158 pub fn push<M>(&mut self, mutation: M) -> String
159 where
160 M: IntoSyncularMutation,
161 {
162 let mutation = mutation.into_syncular_mutation();
163 let row_id = mutation.row_id.clone();
164 self.mutations.push(mutation);
165 row_id
166 }
167
168 pub fn is_empty(&self) -> bool {
169 self.mutations.is_empty()
170 }
171
172 pub fn mutations(&self) -> &[PendingSyncularMutation] {
173 &self.mutations
174 }
175
176 pub fn into_mutations(self) -> Vec<PendingSyncularMutation> {
177 self.mutations
178 }
179}
180
181pub fn validate_mutation_json_input_size(
182 operation_json: &str,
183 local_row_json: Option<&str>,
184) -> Result<()> {
185 validate_payload_bytes(
186 "maxMutationOperationJsonBytes",
187 operation_json.len(),
188 MAX_MUTATION_OPERATION_JSON_BYTES,
189 "Syncular mutation operation JSON exceeds the configured limit",
190 )?;
191 if let Some(local_row_json) = local_row_json {
192 validate_payload_bytes(
193 "maxMutationLocalRowJsonBytes",
194 local_row_json.len(),
195 MAX_MUTATION_LOCAL_ROW_JSON_BYTES,
196 "Syncular mutation local row JSON exceeds the configured limit",
197 )?;
198 }
199 Ok(())
200}
201
202pub fn validate_mutation_batch_json_input_size(operations_json: &str) -> Result<()> {
203 validate_payload_bytes(
204 "maxMutationBatchJsonBytes",
205 operations_json.len(),
206 MAX_MUTATION_BATCH_JSON_BYTES,
207 "Syncular mutation batch JSON exceeds the configured limit",
208 )
209}
210
211pub fn validate_pending_mutation_batch_size(mutations: &[PendingSyncularMutation]) -> Result<()> {
212 let bytes = serde_json::to_vec(mutations)?;
213 validate_payload_bytes(
214 "maxMutationBatchJsonBytes",
215 bytes.len(),
216 MAX_MUTATION_BATCH_JSON_BYTES,
217 "Syncular typed mutation batch exceeds the configured limit",
218 )
219}
220
221pub fn sync_operations_json_for_outbox(operations: &[SyncOperation]) -> Result<String> {
222 let operations_json = serde_json::to_string(operations)?;
223 validate_payload_bytes(
224 "maxOutboxOperationsJsonBytes",
225 operations_json.len(),
226 MAX_OUTBOX_OPERATIONS_JSON_BYTES,
227 "Syncular outbox operations JSON exceeds the configured limit",
228 )?;
229 Ok(operations_json)
230}
231
232pub fn validate_payload_bytes(
233 limit: &'static str,
234 observed: usize,
235 max: usize,
236 message: &'static str,
237) -> Result<()> {
238 if observed > max {
239 return Err(SyncularError::limit_exceeded(limit, observed, max, message));
240 }
241 Ok(())
242}
243
244pub fn random_syncular_id() -> String {
245 Uuid::new_v4().to_string()
246}
247
248pub fn validate_pull_commit_integrity_metadata(response: &PullResponse) -> Result<()> {
249 syncular_protocol::validate_pull_commit_integrity_metadata(response).map_err(Into::into)
250}
251
252pub fn verify_subscription_commit_integrity(
253 subscription_id: &str,
254 stored_root: Option<&str>,
255 integrity: Option<&SubscriptionIntegrity>,
256 commits: &[SyncCommit],
257) -> Result<Option<VerifiedCommitRoot>> {
258 syncular_protocol::verify_subscription_commit_integrity(
259 subscription_id,
260 stored_root,
261 integrity,
262 commits,
263 )
264 .map_err(Into::into)
265}
266
267pub fn validate_pull_snapshot_manifests(response: &PullResponse) -> Result<()> {
268 syncular_protocol::validate_pull_snapshot_manifests(response).map_err(SyncularError::from)?;
269 for subscription in &response.subscriptions {
270 if let Some(snapshots) = &subscription.snapshots {
271 for snapshot in snapshots {
272 if let Some(chunks) = &snapshot.chunks {
273 for chunk in chunks {
274 validate_snapshot_chunk_ref_size(chunk)?;
275 }
276 }
277 if let Some(artifacts) = &snapshot.artifacts {
278 for artifact in artifacts {
279 validate_snapshot_artifact_ref_size(artifact)?;
280 }
281 }
282 }
283 }
284 }
285 Ok(())
286}
287
288pub fn wire_commit_digest(
289 partition_id: &str,
290 subscription_id: &str,
291 commit: &SyncCommit,
292) -> Result<String> {
293 syncular_protocol::wire_commit_digest(partition_id, subscription_id, commit).map_err(Into::into)
294}
295
296pub fn wire_commit_chain_root(
297 partition_id: &str,
298 subscription_id: &str,
299 previous_chain_root: &str,
300 commit_seq: i64,
301 commit_digest: &str,
302) -> Result<String> {
303 syncular_protocol::wire_commit_chain_root(
304 partition_id,
305 subscription_id,
306 previous_chain_root,
307 commit_seq,
308 commit_digest,
309 )
310 .map_err(Into::into)
311}
312
313pub fn wire_commit_chain_root_from_digest(
314 partition_id: &str,
315 subscription_id: &str,
316 previous_chain_root: &str,
317 commit_seq: i64,
318 commit_digest: &str,
319) -> Result<String> {
320 syncular_protocol::wire_commit_chain_root_from_digest(
321 partition_id,
322 subscription_id,
323 previous_chain_root,
324 commit_seq,
325 commit_digest,
326 )
327 .map_err(Into::into)
328}
329
330pub fn snapshot_manifest_digest(manifest: &SnapshotManifest) -> Result<String> {
331 syncular_protocol::snapshot_manifest_digest(manifest).map_err(Into::into)
332}
333
334pub fn blob_hash(data: &[u8]) -> String {
335 syncular_protocol::blob_hash(data)
336}
337
338pub fn normalize_blob_mime_type(mime_type: &str) -> String {
339 syncular_protocol::normalize_blob_mime_type(mime_type)
340}
341
342pub fn blob_hash_reader(mut reader: impl Read) -> Result<(String, i64)> {
343 let mut hasher = Sha256::new();
344 let mut size = 0i64;
345 let mut buffer = [0u8; 64 * 1024];
346 loop {
347 let read = reader.read(&mut buffer)?;
348 if read == 0 {
349 break;
350 }
351 size = size
352 .checked_add(i64::try_from(read).map_err(|_| {
353 SyncularError::protocol_message("blob chunk is too large for size metadata")
354 })?)
355 .ok_or_else(|| SyncularError::protocol_message("blob is too large"))?;
356 validate_blob_size_bytes(size)?;
357 hasher.update(&buffer[..read]);
358 }
359 Ok((format!("sha256:{}", hex::encode(hasher.finalize())), size))
360}
361
362pub fn validate_blob_hash(hash: &str) -> Result<()> {
363 syncular_protocol::validate_blob_hash(hash).map_err(Into::into)
364}
365
366pub fn validate_blob_size_bytes(size: i64) -> Result<()> {
367 if size < 0 {
368 return Err(SyncularError::protocol_message(
369 "blob size cannot be negative",
370 ));
371 }
372 if size > MAX_BLOB_PAYLOAD_BYTES {
373 return Err(SyncularError::limit_exceeded(
374 "maxBlobPayloadBytes",
375 usize::try_from(size).unwrap_or(usize::MAX),
376 usize::try_from(MAX_BLOB_PAYLOAD_BYTES).unwrap_or(usize::MAX),
377 "Syncular blob payload exceeds the configured limit",
378 ));
379 }
380 Ok(())
381}
382
383pub fn validate_blob_ref_size(blob: &BlobRef) -> Result<()> {
384 validate_blob_size_bytes(blob.size)
385}
386
387pub fn validate_blob_bytes(blob: &BlobRef, data: &[u8]) -> Result<()> {
388 validate_payload_bytes(
389 "maxBlobPayloadBytes",
390 data.len(),
391 usize::try_from(MAX_BLOB_PAYLOAD_BYTES).unwrap_or(usize::MAX),
392 "Syncular blob payload exceeds the configured limit",
393 )?;
394 validate_blob_ref_size(blob)?;
395 syncular_protocol::validate_blob_bytes(blob, data).map_err(Into::into)
396}
397
398pub fn validate_blob_digest(blob: &BlobRef, actual_hash: &str, actual_size: i64) -> Result<()> {
399 validate_blob_ref_size(blob)?;
400 validate_blob_size_bytes(actual_size)?;
401 syncular_protocol::validate_blob_digest(blob, actual_hash, actual_size).map_err(Into::into)
402}
403
404pub fn validate_snapshot_chunk_ref_size(chunk: &SnapshotChunkRef) -> Result<()> {
405 validate_i64_payload_bytes(
406 "maxSnapshotChunkCompressedBytes",
407 chunk.byte_length,
408 MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES,
409 "Syncular snapshot chunk compressed payload exceeds the configured limit",
410 )
411}
412
413pub fn validate_snapshot_chunk_compressed_bytes(
414 chunk: &SnapshotChunkRef,
415 bytes: &[u8],
416) -> Result<()> {
417 validate_snapshot_chunk_ref_size(chunk)?;
418 validate_payload_bytes(
419 "maxSnapshotChunkCompressedBytes",
420 bytes.len(),
421 usize::try_from(MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES).unwrap_or(usize::MAX),
422 "Syncular snapshot chunk compressed payload exceeds the configured limit",
423 )?;
424 if bytes.len() as i64 != chunk.byte_length {
425 return Err(SyncularError::protocol_message(format!(
426 "snapshot chunk byte length mismatch: expected {}, got {}",
427 chunk.byte_length,
428 bytes.len()
429 )));
430 }
431 Ok(())
432}
433
434pub fn validate_snapshot_chunk_decompressed_bytes(bytes: &[u8]) -> Result<()> {
435 validate_payload_bytes(
436 "maxSnapshotChunkDecompressedBytes",
437 bytes.len(),
438 MAX_SNAPSHOT_CHUNK_DECOMPRESSED_BYTES,
439 "Syncular snapshot chunk decompressed payload exceeds the configured limit",
440 )
441}
442
443pub fn validate_snapshot_artifact_ref_size(artifact: &ScopedSnapshotArtifactRef) -> Result<()> {
444 validate_i64_payload_bytes(
445 "maxSnapshotArtifactCompressedBytes",
446 artifact.byte_length,
447 MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES,
448 "Syncular snapshot artifact compressed payload exceeds the configured limit",
449 )
450}
451
452pub fn validate_snapshot_artifact_compressed_bytes(
453 artifact: &ScopedSnapshotArtifactRef,
454 bytes: &[u8],
455) -> Result<()> {
456 validate_snapshot_artifact_ref_size(artifact)?;
457 validate_payload_bytes(
458 "maxSnapshotArtifactCompressedBytes",
459 bytes.len(),
460 usize::try_from(MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES).unwrap_or(usize::MAX),
461 "Syncular snapshot artifact compressed payload exceeds the configured limit",
462 )?;
463 if bytes.len() as i64 != artifact.byte_length {
464 return Err(SyncularError::protocol_message(format!(
465 "snapshot artifact byte length mismatch: expected {}, got {}",
466 artifact.byte_length,
467 bytes.len()
468 )));
469 }
470 Ok(())
471}
472
473pub fn validate_snapshot_artifact_decompressed_bytes(bytes: &[u8]) -> Result<()> {
474 validate_payload_bytes(
475 "maxSnapshotArtifactDecompressedBytes",
476 bytes.len(),
477 MAX_SNAPSHOT_ARTIFACT_DECOMPRESSED_BYTES,
478 "Syncular snapshot artifact decompressed payload exceeds the configured limit",
479 )
480}
481
482pub fn validate_realtime_sync_pack_bytes(bytes: &[u8]) -> Result<()> {
483 validate_payload_bytes(
484 "maxRealtimeSyncPackBytes",
485 bytes.len(),
486 MAX_REALTIME_SYNC_PACK_BYTES,
487 "Syncular realtime sync-pack payload exceeds the configured limit",
488 )
489}
490
491pub fn validate_websocket_text_frame_size(text: &str) -> Result<()> {
492 validate_payload_bytes(
493 "maxWebsocketTextFrameBytes",
494 text.len(),
495 MAX_WEBSOCKET_TEXT_FRAME_BYTES,
496 "Syncular websocket text frame exceeds the configured limit",
497 )
498}
499
500fn validate_i64_payload_bytes(
501 limit: &'static str,
502 observed: i64,
503 max: i64,
504 message: &'static str,
505) -> Result<()> {
506 if observed < 0 {
507 return Err(SyncularError::protocol_message(format!(
508 "{limit} observed byte length cannot be negative"
509 )));
510 }
511 if observed > max {
512 return Err(SyncularError::limit_exceeded(
513 limit,
514 usize::try_from(observed).unwrap_or(usize::MAX),
515 usize::try_from(max).unwrap_or(usize::MAX),
516 message,
517 ));
518 }
519 Ok(())
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[test]
527 fn oversized_blob_ref_returns_stable_limit_error() {
528 let blob = BlobRef {
529 hash: format!("sha256:{}", "0".repeat(64)),
530 size: MAX_BLOB_PAYLOAD_BYTES + 1,
531 mime_type: "application/octet-stream".to_string(),
532 encrypted: false,
533 key_id: None,
534 };
535
536 let err = validate_blob_digest(&blob, &blob.hash, blob.size).unwrap_err();
537 let classification = err.classification();
538 assert_eq!(classification.code, "runtime.limit_exceeded");
539 assert_eq!(classification.category, "limit-exceeded");
540 assert_eq!(classification.recommended_action, "reduceInput");
541 assert!(err.message_text().contains("maxBlobPayloadBytes"));
542 }
543
544 #[test]
545 fn oversized_snapshot_refs_return_stable_limit_errors() {
546 let chunk = SnapshotChunkRef {
547 id: "chunk-1".to_string(),
548 byte_length: MAX_SNAPSHOT_CHUNK_COMPRESSED_BYTES + 1,
549 sha256: "0".repeat(64),
550 encoding: SNAPSHOT_CHUNK_ENCODING_BINARY_TABLE_V1.to_string(),
551 compression: SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string(),
552 };
553 let chunk_err = validate_snapshot_chunk_ref_size(&chunk).unwrap_err();
554 assert_eq!(chunk_err.classification().code, "runtime.limit_exceeded");
555 assert!(chunk_err
556 .message_text()
557 .contains("maxSnapshotChunkCompressedBytes"));
558
559 let mut manifest = ScopedSnapshotArtifactManifest {
560 version: syncular_protocol::SCOPED_SNAPSHOT_ARTIFACT_MANIFEST_VERSION,
561 artifact_kind: SCOPED_SNAPSHOT_ARTIFACT_KIND_SQLITE_V1.to_string(),
562 digest: "artifact-digest".to_string(),
563 partition_id: "partition-1".to_string(),
564 subscription_id: "sub-1".to_string(),
565 table: "tasks".to_string(),
566 schema_version: "1".to_string(),
567 as_of_commit_seq: 1,
568 scope_digest: "scope-digest".to_string(),
569 row_cursor: None,
570 byte_length: MAX_SNAPSHOT_ARTIFACT_COMPRESSED_BYTES + 1,
571 sha256: "0".repeat(64),
572 compression: SNAPSHOT_CHUNK_COMPRESSION_GZIP.to_string(),
573 row_count: 1,
574 row_limit: 1,
575 next_row_cursor: None,
576 is_first_page: true,
577 is_last_page: true,
578 feature_set: Vec::new(),
579 };
580 manifest.digest =
581 syncular_protocol::scoped_snapshot_artifact_manifest_digest(&manifest).expect("digest");
582 let artifact = ScopedSnapshotArtifactRef {
583 id: "artifact-1".to_string(),
584 manifest_digest: manifest.digest.clone(),
585 byte_length: manifest.byte_length,
586 sha256: manifest.sha256.clone(),
587 artifact_kind: manifest.artifact_kind.clone(),
588 compression: manifest.compression.clone(),
589 row_count: manifest.row_count,
590 next_row_cursor: manifest.next_row_cursor.clone(),
591 is_first_page: manifest.is_first_page,
592 is_last_page: manifest.is_last_page,
593 manifest,
594 };
595 let artifact_err = validate_snapshot_artifact_ref_size(&artifact).unwrap_err();
596 assert_eq!(artifact_err.classification().code, "runtime.limit_exceeded");
597 assert!(artifact_err
598 .message_text()
599 .contains("maxSnapshotArtifactCompressedBytes"));
600 }
601
602 #[test]
603 fn oversized_realtime_payloads_return_stable_limit_errors() {
604 let sync_pack = vec![0u8; MAX_REALTIME_SYNC_PACK_BYTES + 1];
605 let sync_pack_err = validate_realtime_sync_pack_bytes(&sync_pack).unwrap_err();
606 assert_eq!(
607 sync_pack_err.classification().code,
608 "runtime.limit_exceeded"
609 );
610 assert!(sync_pack_err
611 .message_text()
612 .contains("maxRealtimeSyncPackBytes"));
613
614 let frame = "x".repeat(MAX_WEBSOCKET_TEXT_FRAME_BYTES + 1);
615 let frame_err = validate_websocket_text_frame_size(&frame).unwrap_err();
616 assert_eq!(frame_err.classification().code, "runtime.limit_exceeded");
617 assert!(frame_err
618 .message_text()
619 .contains("maxWebsocketTextFrameBytes"));
620 }
621}