1use std::fmt;
5
6use crate::transport::Request;
7use crate::types::{Filter, Pagination, SortOrder};
8use serde_json::Value;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ProtocolError {
12 #[error("missing required ID for {0} operation")]
13 MissingId(DbOp),
14 #[error("invalid JSON payload: {0}")]
15 InvalidPayload(#[from] serde_json::Error),
16 #[error("payload too large ({0} bytes, max {MAX_PAYLOAD_SIZE})")]
17 PayloadTooLarge(usize),
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum DbOp {
22 Create,
23 Read,
24 Update,
25 Delete,
26 List,
27 Share,
28 Unshare,
29 Shares,
30 Shared,
31}
32
33impl fmt::Display for DbOp {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Self::Create => f.write_str("create"),
37 Self::Read => f.write_str("read"),
38 Self::Update => f.write_str("update"),
39 Self::Delete => f.write_str("delete"),
40 Self::List => f.write_str("list"),
41 Self::Share => f.write_str("share"),
42 Self::Unshare => f.write_str("unshare"),
43 Self::Shares => f.write_str("shares"),
44 Self::Shared => f.write_str("shared"),
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub struct DbOperation {
51 pub entity: String,
52 pub operation: DbOp,
53 pub id: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub enum AdminOperation {
58 SchemaSet { entity: String },
59 SchemaGet { entity: String },
60 ConstraintAdd { entity: String },
61 ConstraintList { entity: String },
62 Backup,
63 Restore,
64 BackupList,
65 Subscribe,
66 Heartbeat { sub_id: String },
67 Unsubscribe { sub_id: String },
68 ConsumerGroupList,
69 ConsumerGroupShow { name: String },
70 Health,
71 UserAdd,
72 UserDelete,
73 UserList,
74 AclRuleAdd,
75 AclRuleRemove,
76 AclRuleList,
77 AclRoleAdd,
78 AclRoleDelete,
79 AclRoleList,
80 AclAssignmentAssign,
81 AclAssignmentUnassign,
82 AclAssignmentList,
83 IndexAdd { entity: String },
84 Catalog,
85 VaultEnable,
86 VaultUnlock,
87 VaultLock,
88 VaultDisable,
89 VaultChange,
90 VaultStatus,
91 PasswordChange,
92 PasswordResetStart,
93 PasswordResetSubmit,
94}
95
96const MAX_ENTITY_NAME_LEN: usize = 128;
97const MAX_RECORD_ID_LEN: usize = 512;
98const MAX_PAYLOAD_SIZE: usize = 4 * 1024 * 1024;
99
100fn is_valid_entity_name(name: &str) -> bool {
101 !name.is_empty()
102 && name.len() <= MAX_ENTITY_NAME_LEN
103 && name
104 .bytes()
105 .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-')
106}
107
108fn is_valid_record_id(id: &str) -> bool {
109 !id.is_empty()
110 && id.len() <= MAX_RECORD_ID_LEN
111 && !id.contains('+')
112 && !id.contains('#')
113 && !id.contains('/')
114}
115
116type ListOptions = (
117 Vec<Filter>,
118 Vec<SortOrder>,
119 Option<Pagination>,
120 Vec<String>,
121 Option<Vec<String>>,
122);
123
124#[allow(clippy::must_use_candidate)]
125pub fn parse_admin_topic(topic: &str) -> Option<AdminOperation> {
126 if topic == "$DB/_health" {
127 return Some(AdminOperation::Health);
128 }
129
130 if let Some(rest) = topic.strip_prefix("$DB/_sub/") {
131 let parts: Vec<&str> = rest.split('/').collect();
132 return match parts.as_slice() {
133 ["subscribe"] => Some(AdminOperation::Subscribe),
134 [id, "heartbeat"] => Some(AdminOperation::Heartbeat {
135 sub_id: (*id).to_string(),
136 }),
137 [id, "unsubscribe"] => Some(AdminOperation::Unsubscribe {
138 sub_id: (*id).to_string(),
139 }),
140 _ => None,
141 };
142 }
143
144 if let Some(rest) = topic.strip_prefix("$DB/_vault/") {
145 return match rest {
146 "enable" => Some(AdminOperation::VaultEnable),
147 "unlock" => Some(AdminOperation::VaultUnlock),
148 "lock" => Some(AdminOperation::VaultLock),
149 "disable" => Some(AdminOperation::VaultDisable),
150 "change" => Some(AdminOperation::VaultChange),
151 "status" => Some(AdminOperation::VaultStatus),
152 _ => None,
153 };
154 }
155
156 if let Some(rest) = topic.strip_prefix("$DB/_auth/") {
157 return match rest {
158 "password/change" => Some(AdminOperation::PasswordChange),
159 "password/reset/start" => Some(AdminOperation::PasswordResetStart),
160 "password/reset/submit" => Some(AdminOperation::PasswordResetSubmit),
161 _ => None,
162 };
163 }
164
165 let parts: Vec<&str> = topic.strip_prefix("$DB/_admin/")?.split('/').collect();
166
167 match parts.as_slice() {
168 ["schema", entity, "set"] if is_valid_entity_name(entity) => {
169 Some(AdminOperation::SchemaSet {
170 entity: (*entity).to_string(),
171 })
172 }
173 ["schema", entity, "get"] if is_valid_entity_name(entity) => {
174 Some(AdminOperation::SchemaGet {
175 entity: (*entity).to_string(),
176 })
177 }
178 ["constraint", entity, "add"] if is_valid_entity_name(entity) => {
179 Some(AdminOperation::ConstraintAdd {
180 entity: (*entity).to_string(),
181 })
182 }
183 ["constraint", entity, "list"] if is_valid_entity_name(entity) => {
184 Some(AdminOperation::ConstraintList {
185 entity: (*entity).to_string(),
186 })
187 }
188 ["index", entity, "add"] if is_valid_entity_name(entity) => {
189 Some(AdminOperation::IndexAdd {
190 entity: (*entity).to_string(),
191 })
192 }
193 ["backup"] => Some(AdminOperation::Backup),
194 ["backup", "list"] => Some(AdminOperation::BackupList),
195 ["restore"] => Some(AdminOperation::Restore),
196 ["consumer-groups"] => Some(AdminOperation::ConsumerGroupList),
197 ["consumer-groups", name] => Some(AdminOperation::ConsumerGroupShow {
198 name: (*name).to_string(),
199 }),
200 ["users", "add"] => Some(AdminOperation::UserAdd),
201 ["users", "delete"] => Some(AdminOperation::UserDelete),
202 ["users", "list"] => Some(AdminOperation::UserList),
203 ["acl", "rules", "add"] => Some(AdminOperation::AclRuleAdd),
204 ["acl", "rules", "remove"] => Some(AdminOperation::AclRuleRemove),
205 ["acl", "rules", "list"] => Some(AdminOperation::AclRuleList),
206 ["acl", "roles", "add"] => Some(AdminOperation::AclRoleAdd),
207 ["acl", "roles", "delete"] => Some(AdminOperation::AclRoleDelete),
208 ["acl", "roles", "list"] => Some(AdminOperation::AclRoleList),
209 ["acl", "assignments", "assign"] => Some(AdminOperation::AclAssignmentAssign),
210 ["acl", "assignments", "unassign"] => Some(AdminOperation::AclAssignmentUnassign),
211 ["acl", "assignments", "list"] => Some(AdminOperation::AclAssignmentList),
212 ["catalog"] => Some(AdminOperation::Catalog),
213 _ => None,
214 }
215}
216
217#[allow(clippy::must_use_candidate)]
218pub fn parse_db_topic(topic: &str) -> Option<DbOperation> {
219 let parts: Vec<&str> = topic.strip_prefix("$DB/")?.split('/').collect();
220
221 match parts.as_slice() {
222 [entity, "create"] if is_valid_entity_name(entity) => Some(DbOperation {
223 entity: (*entity).to_string(),
224 operation: DbOp::Create,
225 id: None,
226 }),
227 [entity, "list"] if is_valid_entity_name(entity) => Some(DbOperation {
228 entity: (*entity).to_string(),
229 operation: DbOp::List,
230 id: None,
231 }),
232 [entity, "shared"] if is_valid_entity_name(entity) => Some(DbOperation {
233 entity: (*entity).to_string(),
234 operation: DbOp::Shared,
235 id: None,
236 }),
237 [entity, id, "share"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
238 Some(DbOperation {
239 entity: (*entity).to_string(),
240 operation: DbOp::Share,
241 id: Some((*id).to_string()),
242 })
243 }
244 [entity, id, "unshare"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
245 Some(DbOperation {
246 entity: (*entity).to_string(),
247 operation: DbOp::Unshare,
248 id: Some((*id).to_string()),
249 })
250 }
251 [entity, id, "shares"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
252 Some(DbOperation {
253 entity: (*entity).to_string(),
254 operation: DbOp::Shares,
255 id: Some((*id).to_string()),
256 })
257 }
258 [entity, id] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
259 Some(DbOperation {
260 entity: (*entity).to_string(),
261 operation: DbOp::Read,
262 id: Some((*id).to_string()),
263 })
264 }
265 [entity, id, "update"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
266 Some(DbOperation {
267 entity: (*entity).to_string(),
268 operation: DbOp::Update,
269 id: Some((*id).to_string()),
270 })
271 }
272 [entity, id, "delete"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
273 Some(DbOperation {
274 entity: (*entity).to_string(),
275 operation: DbOp::Delete,
276 id: Some((*id).to_string()),
277 })
278 }
279 _ => None,
280 }
281}
282
283pub fn build_request(op: DbOperation, payload: &[u8]) -> Result<Request, ProtocolError> {
288 if payload.len() > MAX_PAYLOAD_SIZE {
289 return Err(ProtocolError::PayloadTooLarge(payload.len()));
290 }
291 let data: Value = if payload.is_empty() {
292 Value::Null
293 } else {
294 serde_json::from_slice(payload)?
295 };
296
297 match op.operation {
298 DbOp::Create => Ok(Request::Create {
299 entity: op.entity,
300 data,
301 }),
302 DbOp::Read => {
303 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Read))?;
304 let (includes, projection) = extract_read_options(&data);
305 Ok(Request::Read {
306 entity: op.entity,
307 id,
308 includes,
309 projection,
310 })
311 }
312 DbOp::Update => {
313 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Update))?;
314 Ok(Request::Update {
315 entity: op.entity,
316 id,
317 fields: data,
318 })
319 }
320 DbOp::Delete => {
321 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Delete))?;
322 Ok(Request::Delete {
323 entity: op.entity,
324 id,
325 })
326 }
327 DbOp::List => {
328 let (filters, sort, pagination, includes, projection) = extract_list_options(&data);
329 Ok(Request::List {
330 entity: op.entity,
331 filters,
332 sort,
333 pagination,
334 includes,
335 projection,
336 })
337 }
338 DbOp::Share => {
339 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Share))?;
340 let grantee = data
341 .get("grantee")
342 .and_then(Value::as_str)
343 .unwrap_or_default()
344 .to_string();
345 let permission = data
346 .get("permission")
347 .and_then(Value::as_str)
348 .unwrap_or("view")
349 .to_string();
350 let cascade = data.get("cascade").and_then(Value::as_bool).unwrap_or(true);
351 Ok(Request::Share {
352 entity: op.entity,
353 id,
354 grantee,
355 permission,
356 cascade,
357 })
358 }
359 DbOp::Unshare => {
360 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Unshare))?;
361 let grantee = data
362 .get("grantee")
363 .and_then(Value::as_str)
364 .unwrap_or_default()
365 .to_string();
366 let cascade = data.get("cascade").and_then(Value::as_bool).unwrap_or(true);
367 Ok(Request::Unshare {
368 entity: op.entity,
369 id,
370 grantee,
371 cascade,
372 })
373 }
374 DbOp::Shares => {
375 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Shares))?;
376 Ok(Request::Shares {
377 entity: op.entity,
378 id,
379 })
380 }
381 DbOp::Shared => Ok(Request::Shared { entity: op.entity }),
382 }
383}
384
385fn extract_read_options(data: &Value) -> (Vec<String>, Option<Vec<String>>) {
386 let includes = data
387 .get("includes")
388 .and_then(|v| v.as_array())
389 .map(|arr| {
390 arr.iter()
391 .filter_map(|v| v.as_str().map(String::from))
392 .collect()
393 })
394 .unwrap_or_default();
395
396 let projection = data
397 .get("projection")
398 .and_then(|v| v.as_array())
399 .map(|arr| {
400 arr.iter()
401 .filter_map(|v| v.as_str().map(String::from))
402 .collect()
403 });
404
405 (includes, projection)
406}
407
408#[allow(clippy::cast_possible_truncation)]
409fn extract_list_options(data: &Value) -> ListOptions {
410 let filters: Vec<Filter> = data
411 .get("filters")
412 .and_then(|v| serde_json::from_value(v.clone()).ok())
413 .unwrap_or_default();
414
415 let sort: Vec<SortOrder> = data
416 .get("sort")
417 .and_then(|v| serde_json::from_value(v.clone()).ok())
418 .unwrap_or_default();
419
420 let pagination: Option<Pagination> = data
421 .get("pagination")
422 .and_then(|v| serde_json::from_value(v.clone()).ok())
423 .or_else(|| {
424 let limit = data
425 .get("limit")
426 .and_then(serde_json::Value::as_u64)
427 .map(|v| v as usize);
428 let offset = data
429 .get("offset")
430 .and_then(serde_json::Value::as_u64)
431 .map(|v| v as usize);
432 match (limit, offset) {
433 (Some(l), Some(o)) => Some(Pagination::new(l, o)),
434 (Some(l), None) => Some(Pagination::new(l, 0)),
435 (None, Some(o)) => Some(Pagination::new(usize::MAX, o)),
436 (None, None) => None,
437 }
438 });
439
440 let includes = data
441 .get("includes")
442 .and_then(|v| v.as_array())
443 .map(|arr| {
444 arr.iter()
445 .filter_map(|v| v.as_str().map(String::from))
446 .collect()
447 })
448 .unwrap_or_default();
449
450 let projection = data
451 .get("projection")
452 .and_then(|v| v.as_array())
453 .map(|arr| {
454 arr.iter()
455 .filter_map(|v| v.as_str().map(String::from))
456 .collect()
457 });
458
459 (filters, sort, pagination, includes, projection)
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 #[test]
467 fn test_parse_db_topic_create() {
468 let op = parse_db_topic("$DB/users/create").unwrap();
469 assert_eq!(op.entity, "users");
470 assert_eq!(op.operation, DbOp::Create);
471 assert!(op.id.is_none());
472 }
473
474 #[test]
475 fn test_parse_db_topic_read() {
476 let op = parse_db_topic("$DB/users/123").unwrap();
477 assert_eq!(op.entity, "users");
478 assert_eq!(op.operation, DbOp::Read);
479 assert_eq!(op.id, Some("123".to_string()));
480 }
481
482 #[test]
483 fn test_parse_db_topic_update() {
484 let op = parse_db_topic("$DB/users/123/update").unwrap();
485 assert_eq!(op.entity, "users");
486 assert_eq!(op.operation, DbOp::Update);
487 assert_eq!(op.id, Some("123".to_string()));
488 }
489
490 #[test]
491 fn test_parse_db_topic_delete() {
492 let op = parse_db_topic("$DB/users/123/delete").unwrap();
493 assert_eq!(op.entity, "users");
494 assert_eq!(op.operation, DbOp::Delete);
495 assert_eq!(op.id, Some("123".to_string()));
496 }
497
498 #[test]
499 fn test_parse_db_topic_list() {
500 let op = parse_db_topic("$DB/users/list").unwrap();
501 assert_eq!(op.entity, "users");
502 assert_eq!(op.operation, DbOp::List);
503 assert!(op.id.is_none());
504 }
505
506 #[test]
507 fn test_parse_db_topic_invalid() {
508 assert!(parse_db_topic("invalid/topic").is_none());
509 assert!(parse_db_topic("$DB").is_none());
510 assert!(parse_db_topic("$DB/").is_none());
511 }
512
513 #[test]
514 fn test_parse_db_topic_share_family() {
515 let grant_op = parse_db_topic("$DB/diagrams/abc/share").unwrap();
516 assert_eq!(grant_op.entity, "diagrams");
517 assert_eq!(grant_op.operation, DbOp::Share);
518 assert_eq!(grant_op.id, Some("abc".to_string()));
519
520 let revoke_op = parse_db_topic("$DB/diagrams/abc/unshare").unwrap();
521 assert_eq!(revoke_op.operation, DbOp::Unshare);
522
523 let list_grants_op = parse_db_topic("$DB/diagrams/abc/shares").unwrap();
524 assert_eq!(list_grants_op.operation, DbOp::Shares);
525 assert_eq!(list_grants_op.id, Some("abc".to_string()));
526
527 let discovery_op = parse_db_topic("$DB/diagrams/shared").unwrap();
528 assert_eq!(discovery_op.operation, DbOp::Shared);
529 assert!(discovery_op.id.is_none());
530 }
531
532 #[test]
533 fn test_build_share_request() {
534 let op = DbOperation {
535 entity: "diagrams".to_string(),
536 operation: DbOp::Share,
537 id: Some("abc".to_string()),
538 };
539 let payload = br#"{"grantee":"bob","permission":"edit"}"#;
540 match build_request(op, payload).unwrap() {
541 Request::Share {
542 entity,
543 id,
544 grantee,
545 permission,
546 cascade,
547 } => {
548 assert_eq!(entity, "diagrams");
549 assert_eq!(id, "abc");
550 assert_eq!(grantee, "bob");
551 assert_eq!(permission, "edit");
552 assert!(cascade, "cascade defaults to true");
553 }
554 other => panic!("expected Share, got {other:?}"),
555 }
556 }
557
558 #[test]
559 fn test_build_create_request() {
560 let op = DbOperation {
561 entity: "users".to_string(),
562 operation: DbOp::Create,
563 id: None,
564 };
565 let payload = br#"{"name": "Alice"}"#;
566 let request = build_request(op, payload).unwrap();
567
568 match request {
569 Request::Create { entity, data } => {
570 assert_eq!(entity, "users");
571 assert_eq!(data["name"], "Alice");
572 }
573 _ => panic!("expected Create request"),
574 }
575 }
576
577 #[test]
578 fn test_build_read_request() {
579 let op = DbOperation {
580 entity: "users".to_string(),
581 operation: DbOp::Read,
582 id: Some("123".to_string()),
583 };
584 let payload = br#"{"projection": ["name", "email"]}"#;
585 let request = build_request(op, payload).unwrap();
586
587 match request {
588 Request::Read {
589 entity,
590 id,
591 projection,
592 ..
593 } => {
594 assert_eq!(entity, "users");
595 assert_eq!(id, "123");
596 assert_eq!(
597 projection,
598 Some(vec!["name".to_string(), "email".to_string()])
599 );
600 }
601 _ => panic!("expected Read request"),
602 }
603 }
604
605 #[test]
606 fn test_build_list_request() {
607 let op = DbOperation {
608 entity: "users".to_string(),
609 operation: DbOp::List,
610 id: None,
611 };
612 let payload = br#"{"filters": [{"field": "age", "op": "gt", "value": 18}]}"#;
613 let request = build_request(op, payload).unwrap();
614
615 match request {
616 Request::List {
617 entity, filters, ..
618 } => {
619 assert_eq!(entity, "users");
620 assert_eq!(filters.len(), 1);
621 assert_eq!(filters[0].field, "age");
622 }
623 _ => panic!("expected List request"),
624 }
625 }
626
627 #[test]
628 fn test_read_without_id_fails() {
629 let op = DbOperation {
630 entity: "users".to_string(),
631 operation: DbOp::Read,
632 id: None,
633 };
634 let result = build_request(op, &[]);
635 assert!(result.is_err());
636 }
637
638 #[test]
639 fn test_parse_admin_topic_health() {
640 let op = parse_admin_topic("$DB/_health").unwrap();
641 assert!(matches!(op, AdminOperation::Health));
642 }
643
644 #[test]
645 fn test_parse_admin_topic_catalog() {
646 let op = parse_admin_topic("$DB/_admin/catalog").unwrap();
647 assert!(matches!(op, AdminOperation::Catalog));
648 }
649
650 #[test]
651 fn test_parse_vault_topics() {
652 assert!(matches!(
653 parse_admin_topic("$DB/_vault/enable"),
654 Some(AdminOperation::VaultEnable)
655 ));
656 assert!(matches!(
657 parse_admin_topic("$DB/_vault/unlock"),
658 Some(AdminOperation::VaultUnlock)
659 ));
660 assert!(matches!(
661 parse_admin_topic("$DB/_vault/lock"),
662 Some(AdminOperation::VaultLock)
663 ));
664 assert!(matches!(
665 parse_admin_topic("$DB/_vault/disable"),
666 Some(AdminOperation::VaultDisable)
667 ));
668 assert!(matches!(
669 parse_admin_topic("$DB/_vault/change"),
670 Some(AdminOperation::VaultChange)
671 ));
672 assert!(matches!(
673 parse_admin_topic("$DB/_vault/status"),
674 Some(AdminOperation::VaultStatus)
675 ));
676 assert!(parse_admin_topic("$DB/_vault/unknown").is_none());
677 }
678
679 #[test]
680 fn test_parse_auth_topics() {
681 assert!(matches!(
682 parse_admin_topic("$DB/_auth/password/change"),
683 Some(AdminOperation::PasswordChange)
684 ));
685 assert!(matches!(
686 parse_admin_topic("$DB/_auth/password/reset/start"),
687 Some(AdminOperation::PasswordResetStart)
688 ));
689 assert!(matches!(
690 parse_admin_topic("$DB/_auth/password/reset/submit"),
691 Some(AdminOperation::PasswordResetSubmit)
692 ));
693 assert!(parse_admin_topic("$DB/_auth/unknown").is_none());
694 assert!(parse_admin_topic("$DB/_auth/password/reset/other").is_none());
695 }
696
697 #[test]
698 fn entity_name_validation() {
699 assert!(parse_db_topic("$DB/users/create").is_some());
700 assert!(parse_db_topic("$DB/my-entity/create").is_some());
701 assert!(parse_db_topic("$DB/my_entity/create").is_some());
702 assert!(parse_db_topic("$DB/Entity123/create").is_some());
703 assert!(parse_db_topic("$DB//create").is_none());
704 assert!(parse_db_topic("$DB/has space/create").is_none());
705 assert!(parse_db_topic("$DB/has.dot/create").is_none());
706 assert!(parse_db_topic(&format!("$DB/{}/create", "a".repeat(129))).is_none());
707 assert!(parse_db_topic(&format!("$DB/{}/create", "a".repeat(128))).is_some());
708 }
709
710 #[test]
711 fn record_id_validation() {
712 assert!(parse_db_topic("$DB/users/valid-id").is_some());
713 assert!(parse_db_topic("$DB/users/abc123").is_some());
714 assert!(parse_db_topic("$DB/users/+").is_none());
715 assert!(parse_db_topic("$DB/users/#").is_none());
716 let long_id = "x".repeat(513);
717 assert!(parse_db_topic(&format!("$DB/users/{long_id}")).is_none());
718 let ok_id = "x".repeat(512);
719 assert!(parse_db_topic(&format!("$DB/users/{ok_id}")).is_some());
720 }
721
722 #[test]
723 fn admin_entity_name_validation() {
724 assert!(parse_admin_topic("$DB/_admin/schema/users/set").is_some());
725 assert!(parse_admin_topic("$DB/_admin/schema/has space/set").is_none());
726 assert!(parse_admin_topic("$DB/_admin/constraint/has.dot/add").is_none());
727 assert!(parse_admin_topic(&format!("$DB/_admin/index/{}/add", "a".repeat(129))).is_none());
728 }
729
730 #[test]
731 fn payload_too_large_rejected() {
732 let op = DbOperation {
733 entity: "users".to_string(),
734 operation: DbOp::Create,
735 id: None,
736 };
737 let big_payload = vec![b'{'; MAX_PAYLOAD_SIZE + 1];
738 let result = build_request(op, &big_payload);
739 assert!(matches!(result, Err(ProtocolError::PayloadTooLarge(_))));
740 }
741
742 #[test]
743 fn test_extract_list_options_with_eq_filter() {
744 let payload = serde_json::json!({
745 "filters": [{"field": "email", "op": "eq", "value": "alice@example.com"}]
746 });
747 let (filters, sort, pagination, includes, projection) = extract_list_options(&payload);
748 assert_eq!(filters.len(), 1);
749 assert_eq!(filters[0].field, "email");
750 assert!(matches!(filters[0].op, crate::FilterOp::Eq));
751 assert_eq!(filters[0].value, serde_json::json!("alice@example.com"));
752 assert!(sort.is_empty());
753 assert!(pagination.is_none());
754 assert!(includes.is_empty());
755 assert!(projection.is_none());
756 }
757}