Skip to main content

mqdb_core/protocol/
mod.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::fmt;
5
6use crate::transport::Request;
7use crate::types::{Filter, Pagination, SortOrder};
8use serde_json::Value;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ProtocolError {
12    #[error("missing required ID for {0} operation")]
13    MissingId(DbOp),
14    #[error("invalid JSON payload: {0}")]
15    InvalidPayload(#[from] serde_json::Error),
16    #[error("payload too large ({0} bytes, max {MAX_PAYLOAD_SIZE})")]
17    PayloadTooLarge(usize),
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum DbOp {
22    Create,
23    Read,
24    Update,
25    Delete,
26    List,
27    Share,
28    Unshare,
29    Shares,
30    Shared,
31}
32
33impl fmt::Display for DbOp {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            Self::Create => f.write_str("create"),
37            Self::Read => f.write_str("read"),
38            Self::Update => f.write_str("update"),
39            Self::Delete => f.write_str("delete"),
40            Self::List => f.write_str("list"),
41            Self::Share => f.write_str("share"),
42            Self::Unshare => f.write_str("unshare"),
43            Self::Shares => f.write_str("shares"),
44            Self::Shared => f.write_str("shared"),
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub struct DbOperation {
51    pub entity: String,
52    pub operation: DbOp,
53    pub id: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub enum AdminOperation {
58    SchemaSet { entity: String },
59    SchemaGet { entity: String },
60    ConstraintAdd { entity: String },
61    ConstraintList { entity: String },
62    Backup,
63    Restore,
64    BackupList,
65    Subscribe,
66    Heartbeat { sub_id: String },
67    Unsubscribe { sub_id: String },
68    ConsumerGroupList,
69    ConsumerGroupShow { name: String },
70    Health,
71    UserAdd,
72    UserDelete,
73    UserList,
74    AclRuleAdd,
75    AclRuleRemove,
76    AclRuleList,
77    AclRoleAdd,
78    AclRoleDelete,
79    AclRoleList,
80    AclAssignmentAssign,
81    AclAssignmentUnassign,
82    AclAssignmentList,
83    IndexAdd { entity: String },
84    Catalog,
85    VaultEnable,
86    VaultUnlock,
87    VaultLock,
88    VaultDisable,
89    VaultChange,
90    VaultStatus,
91    PasswordChange,
92    PasswordResetStart,
93    PasswordResetSubmit,
94}
95
96const MAX_ENTITY_NAME_LEN: usize = 128;
97const MAX_RECORD_ID_LEN: usize = 512;
98const MAX_PAYLOAD_SIZE: usize = 4 * 1024 * 1024;
99
100fn is_valid_entity_name(name: &str) -> bool {
101    !name.is_empty()
102        && name.len() <= MAX_ENTITY_NAME_LEN
103        && name
104            .bytes()
105            .all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-')
106}
107
108fn is_valid_record_id(id: &str) -> bool {
109    !id.is_empty()
110        && id.len() <= MAX_RECORD_ID_LEN
111        && !id.contains('+')
112        && !id.contains('#')
113        && !id.contains('/')
114}
115
116type ListOptions = (
117    Vec<Filter>,
118    Vec<SortOrder>,
119    Option<Pagination>,
120    Vec<String>,
121    Option<Vec<String>>,
122);
123
124#[allow(clippy::must_use_candidate)]
125pub fn parse_admin_topic(topic: &str) -> Option<AdminOperation> {
126    if topic == "$DB/_health" {
127        return Some(AdminOperation::Health);
128    }
129
130    if let Some(rest) = topic.strip_prefix("$DB/_sub/") {
131        let parts: Vec<&str> = rest.split('/').collect();
132        return match parts.as_slice() {
133            ["subscribe"] => Some(AdminOperation::Subscribe),
134            [id, "heartbeat"] => Some(AdminOperation::Heartbeat {
135                sub_id: (*id).to_string(),
136            }),
137            [id, "unsubscribe"] => Some(AdminOperation::Unsubscribe {
138                sub_id: (*id).to_string(),
139            }),
140            _ => None,
141        };
142    }
143
144    if let Some(rest) = topic.strip_prefix("$DB/_vault/") {
145        return match rest {
146            "enable" => Some(AdminOperation::VaultEnable),
147            "unlock" => Some(AdminOperation::VaultUnlock),
148            "lock" => Some(AdminOperation::VaultLock),
149            "disable" => Some(AdminOperation::VaultDisable),
150            "change" => Some(AdminOperation::VaultChange),
151            "status" => Some(AdminOperation::VaultStatus),
152            _ => None,
153        };
154    }
155
156    if let Some(rest) = topic.strip_prefix("$DB/_auth/") {
157        return match rest {
158            "password/change" => Some(AdminOperation::PasswordChange),
159            "password/reset/start" => Some(AdminOperation::PasswordResetStart),
160            "password/reset/submit" => Some(AdminOperation::PasswordResetSubmit),
161            _ => None,
162        };
163    }
164
165    let parts: Vec<&str> = topic.strip_prefix("$DB/_admin/")?.split('/').collect();
166
167    match parts.as_slice() {
168        ["schema", entity, "set"] if is_valid_entity_name(entity) => {
169            Some(AdminOperation::SchemaSet {
170                entity: (*entity).to_string(),
171            })
172        }
173        ["schema", entity, "get"] if is_valid_entity_name(entity) => {
174            Some(AdminOperation::SchemaGet {
175                entity: (*entity).to_string(),
176            })
177        }
178        ["constraint", entity, "add"] if is_valid_entity_name(entity) => {
179            Some(AdminOperation::ConstraintAdd {
180                entity: (*entity).to_string(),
181            })
182        }
183        ["constraint", entity, "list"] if is_valid_entity_name(entity) => {
184            Some(AdminOperation::ConstraintList {
185                entity: (*entity).to_string(),
186            })
187        }
188        ["index", entity, "add"] if is_valid_entity_name(entity) => {
189            Some(AdminOperation::IndexAdd {
190                entity: (*entity).to_string(),
191            })
192        }
193        ["backup"] => Some(AdminOperation::Backup),
194        ["backup", "list"] => Some(AdminOperation::BackupList),
195        ["restore"] => Some(AdminOperation::Restore),
196        ["consumer-groups"] => Some(AdminOperation::ConsumerGroupList),
197        ["consumer-groups", name] => Some(AdminOperation::ConsumerGroupShow {
198            name: (*name).to_string(),
199        }),
200        ["users", "add"] => Some(AdminOperation::UserAdd),
201        ["users", "delete"] => Some(AdminOperation::UserDelete),
202        ["users", "list"] => Some(AdminOperation::UserList),
203        ["acl", "rules", "add"] => Some(AdminOperation::AclRuleAdd),
204        ["acl", "rules", "remove"] => Some(AdminOperation::AclRuleRemove),
205        ["acl", "rules", "list"] => Some(AdminOperation::AclRuleList),
206        ["acl", "roles", "add"] => Some(AdminOperation::AclRoleAdd),
207        ["acl", "roles", "delete"] => Some(AdminOperation::AclRoleDelete),
208        ["acl", "roles", "list"] => Some(AdminOperation::AclRoleList),
209        ["acl", "assignments", "assign"] => Some(AdminOperation::AclAssignmentAssign),
210        ["acl", "assignments", "unassign"] => Some(AdminOperation::AclAssignmentUnassign),
211        ["acl", "assignments", "list"] => Some(AdminOperation::AclAssignmentList),
212        ["catalog"] => Some(AdminOperation::Catalog),
213        _ => None,
214    }
215}
216
217#[allow(clippy::must_use_candidate)]
218pub fn parse_db_topic(topic: &str) -> Option<DbOperation> {
219    let parts: Vec<&str> = topic.strip_prefix("$DB/")?.split('/').collect();
220
221    match parts.as_slice() {
222        [entity, "create"] if is_valid_entity_name(entity) => Some(DbOperation {
223            entity: (*entity).to_string(),
224            operation: DbOp::Create,
225            id: None,
226        }),
227        [entity, "list"] if is_valid_entity_name(entity) => Some(DbOperation {
228            entity: (*entity).to_string(),
229            operation: DbOp::List,
230            id: None,
231        }),
232        [entity, "shared"] if is_valid_entity_name(entity) => Some(DbOperation {
233            entity: (*entity).to_string(),
234            operation: DbOp::Shared,
235            id: None,
236        }),
237        [entity, id, "share"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
238            Some(DbOperation {
239                entity: (*entity).to_string(),
240                operation: DbOp::Share,
241                id: Some((*id).to_string()),
242            })
243        }
244        [entity, id, "unshare"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
245            Some(DbOperation {
246                entity: (*entity).to_string(),
247                operation: DbOp::Unshare,
248                id: Some((*id).to_string()),
249            })
250        }
251        [entity, id, "shares"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
252            Some(DbOperation {
253                entity: (*entity).to_string(),
254                operation: DbOp::Shares,
255                id: Some((*id).to_string()),
256            })
257        }
258        [entity, id] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
259            Some(DbOperation {
260                entity: (*entity).to_string(),
261                operation: DbOp::Read,
262                id: Some((*id).to_string()),
263            })
264        }
265        [entity, id, "update"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
266            Some(DbOperation {
267                entity: (*entity).to_string(),
268                operation: DbOp::Update,
269                id: Some((*id).to_string()),
270            })
271        }
272        [entity, id, "delete"] if is_valid_entity_name(entity) && is_valid_record_id(id) => {
273            Some(DbOperation {
274                entity: (*entity).to_string(),
275                operation: DbOp::Delete,
276                id: Some((*id).to_string()),
277            })
278        }
279        _ => None,
280    }
281}
282
283/// Builds a database request from an operation descriptor and payload.
284///
285/// # Errors
286/// Returns an error if JSON deserialization fails or a required ID is missing.
287pub fn build_request(op: DbOperation, payload: &[u8]) -> Result<Request, ProtocolError> {
288    if payload.len() > MAX_PAYLOAD_SIZE {
289        return Err(ProtocolError::PayloadTooLarge(payload.len()));
290    }
291    let data: Value = if payload.is_empty() {
292        Value::Null
293    } else {
294        serde_json::from_slice(payload)?
295    };
296
297    match op.operation {
298        DbOp::Create => Ok(Request::Create {
299            entity: op.entity,
300            data,
301        }),
302        DbOp::Read => {
303            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Read))?;
304            let (includes, projection) = extract_read_options(&data);
305            Ok(Request::Read {
306                entity: op.entity,
307                id,
308                includes,
309                projection,
310            })
311        }
312        DbOp::Update => {
313            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Update))?;
314            Ok(Request::Update {
315                entity: op.entity,
316                id,
317                fields: data,
318            })
319        }
320        DbOp::Delete => {
321            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Delete))?;
322            Ok(Request::Delete {
323                entity: op.entity,
324                id,
325            })
326        }
327        DbOp::List => {
328            let (filters, sort, pagination, includes, projection) = extract_list_options(&data);
329            Ok(Request::List {
330                entity: op.entity,
331                filters,
332                sort,
333                pagination,
334                includes,
335                projection,
336            })
337        }
338        DbOp::Share => {
339            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Share))?;
340            let grantee = data
341                .get("grantee")
342                .and_then(Value::as_str)
343                .unwrap_or_default()
344                .to_string();
345            let permission = data
346                .get("permission")
347                .and_then(Value::as_str)
348                .unwrap_or("view")
349                .to_string();
350            let cascade = data.get("cascade").and_then(Value::as_bool).unwrap_or(true);
351            Ok(Request::Share {
352                entity: op.entity,
353                id,
354                grantee,
355                permission,
356                cascade,
357            })
358        }
359        DbOp::Unshare => {
360            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Unshare))?;
361            let grantee = data
362                .get("grantee")
363                .and_then(Value::as_str)
364                .unwrap_or_default()
365                .to_string();
366            let cascade = data.get("cascade").and_then(Value::as_bool).unwrap_or(true);
367            Ok(Request::Unshare {
368                entity: op.entity,
369                id,
370                grantee,
371                cascade,
372            })
373        }
374        DbOp::Shares => {
375            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Shares))?;
376            Ok(Request::Shares {
377                entity: op.entity,
378                id,
379            })
380        }
381        DbOp::Shared => Ok(Request::Shared { entity: op.entity }),
382    }
383}
384
385fn extract_read_options(data: &Value) -> (Vec<String>, Option<Vec<String>>) {
386    let includes = data
387        .get("includes")
388        .and_then(|v| v.as_array())
389        .map(|arr| {
390            arr.iter()
391                .filter_map(|v| v.as_str().map(String::from))
392                .collect()
393        })
394        .unwrap_or_default();
395
396    let projection = data
397        .get("projection")
398        .and_then(|v| v.as_array())
399        .map(|arr| {
400            arr.iter()
401                .filter_map(|v| v.as_str().map(String::from))
402                .collect()
403        });
404
405    (includes, projection)
406}
407
408#[allow(clippy::cast_possible_truncation)]
409fn extract_list_options(data: &Value) -> ListOptions {
410    let filters: Vec<Filter> = data
411        .get("filters")
412        .and_then(|v| serde_json::from_value(v.clone()).ok())
413        .unwrap_or_default();
414
415    let sort: Vec<SortOrder> = data
416        .get("sort")
417        .and_then(|v| serde_json::from_value(v.clone()).ok())
418        .unwrap_or_default();
419
420    let pagination: Option<Pagination> = data
421        .get("pagination")
422        .and_then(|v| serde_json::from_value(v.clone()).ok())
423        .or_else(|| {
424            let limit = data
425                .get("limit")
426                .and_then(serde_json::Value::as_u64)
427                .map(|v| v as usize);
428            let offset = data
429                .get("offset")
430                .and_then(serde_json::Value::as_u64)
431                .map(|v| v as usize);
432            match (limit, offset) {
433                (Some(l), Some(o)) => Some(Pagination::new(l, o)),
434                (Some(l), None) => Some(Pagination::new(l, 0)),
435                (None, Some(o)) => Some(Pagination::new(usize::MAX, o)),
436                (None, None) => None,
437            }
438        });
439
440    let includes = data
441        .get("includes")
442        .and_then(|v| v.as_array())
443        .map(|arr| {
444            arr.iter()
445                .filter_map(|v| v.as_str().map(String::from))
446                .collect()
447        })
448        .unwrap_or_default();
449
450    let projection = data
451        .get("projection")
452        .and_then(|v| v.as_array())
453        .map(|arr| {
454            arr.iter()
455                .filter_map(|v| v.as_str().map(String::from))
456                .collect()
457        });
458
459    (filters, sort, pagination, includes, projection)
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    #[test]
467    fn test_parse_db_topic_create() {
468        let op = parse_db_topic("$DB/users/create").unwrap();
469        assert_eq!(op.entity, "users");
470        assert_eq!(op.operation, DbOp::Create);
471        assert!(op.id.is_none());
472    }
473
474    #[test]
475    fn test_parse_db_topic_read() {
476        let op = parse_db_topic("$DB/users/123").unwrap();
477        assert_eq!(op.entity, "users");
478        assert_eq!(op.operation, DbOp::Read);
479        assert_eq!(op.id, Some("123".to_string()));
480    }
481
482    #[test]
483    fn test_parse_db_topic_update() {
484        let op = parse_db_topic("$DB/users/123/update").unwrap();
485        assert_eq!(op.entity, "users");
486        assert_eq!(op.operation, DbOp::Update);
487        assert_eq!(op.id, Some("123".to_string()));
488    }
489
490    #[test]
491    fn test_parse_db_topic_delete() {
492        let op = parse_db_topic("$DB/users/123/delete").unwrap();
493        assert_eq!(op.entity, "users");
494        assert_eq!(op.operation, DbOp::Delete);
495        assert_eq!(op.id, Some("123".to_string()));
496    }
497
498    #[test]
499    fn test_parse_db_topic_list() {
500        let op = parse_db_topic("$DB/users/list").unwrap();
501        assert_eq!(op.entity, "users");
502        assert_eq!(op.operation, DbOp::List);
503        assert!(op.id.is_none());
504    }
505
506    #[test]
507    fn test_parse_db_topic_invalid() {
508        assert!(parse_db_topic("invalid/topic").is_none());
509        assert!(parse_db_topic("$DB").is_none());
510        assert!(parse_db_topic("$DB/").is_none());
511    }
512
513    #[test]
514    fn test_parse_db_topic_share_family() {
515        let grant_op = parse_db_topic("$DB/diagrams/abc/share").unwrap();
516        assert_eq!(grant_op.entity, "diagrams");
517        assert_eq!(grant_op.operation, DbOp::Share);
518        assert_eq!(grant_op.id, Some("abc".to_string()));
519
520        let revoke_op = parse_db_topic("$DB/diagrams/abc/unshare").unwrap();
521        assert_eq!(revoke_op.operation, DbOp::Unshare);
522
523        let list_grants_op = parse_db_topic("$DB/diagrams/abc/shares").unwrap();
524        assert_eq!(list_grants_op.operation, DbOp::Shares);
525        assert_eq!(list_grants_op.id, Some("abc".to_string()));
526
527        let discovery_op = parse_db_topic("$DB/diagrams/shared").unwrap();
528        assert_eq!(discovery_op.operation, DbOp::Shared);
529        assert!(discovery_op.id.is_none());
530    }
531
532    #[test]
533    fn test_build_share_request() {
534        let op = DbOperation {
535            entity: "diagrams".to_string(),
536            operation: DbOp::Share,
537            id: Some("abc".to_string()),
538        };
539        let payload = br#"{"grantee":"bob","permission":"edit"}"#;
540        match build_request(op, payload).unwrap() {
541            Request::Share {
542                entity,
543                id,
544                grantee,
545                permission,
546                cascade,
547            } => {
548                assert_eq!(entity, "diagrams");
549                assert_eq!(id, "abc");
550                assert_eq!(grantee, "bob");
551                assert_eq!(permission, "edit");
552                assert!(cascade, "cascade defaults to true");
553            }
554            other => panic!("expected Share, got {other:?}"),
555        }
556    }
557
558    #[test]
559    fn test_build_create_request() {
560        let op = DbOperation {
561            entity: "users".to_string(),
562            operation: DbOp::Create,
563            id: None,
564        };
565        let payload = br#"{"name": "Alice"}"#;
566        let request = build_request(op, payload).unwrap();
567
568        match request {
569            Request::Create { entity, data } => {
570                assert_eq!(entity, "users");
571                assert_eq!(data["name"], "Alice");
572            }
573            _ => panic!("expected Create request"),
574        }
575    }
576
577    #[test]
578    fn test_build_read_request() {
579        let op = DbOperation {
580            entity: "users".to_string(),
581            operation: DbOp::Read,
582            id: Some("123".to_string()),
583        };
584        let payload = br#"{"projection": ["name", "email"]}"#;
585        let request = build_request(op, payload).unwrap();
586
587        match request {
588            Request::Read {
589                entity,
590                id,
591                projection,
592                ..
593            } => {
594                assert_eq!(entity, "users");
595                assert_eq!(id, "123");
596                assert_eq!(
597                    projection,
598                    Some(vec!["name".to_string(), "email".to_string()])
599                );
600            }
601            _ => panic!("expected Read request"),
602        }
603    }
604
605    #[test]
606    fn test_build_list_request() {
607        let op = DbOperation {
608            entity: "users".to_string(),
609            operation: DbOp::List,
610            id: None,
611        };
612        let payload = br#"{"filters": [{"field": "age", "op": "gt", "value": 18}]}"#;
613        let request = build_request(op, payload).unwrap();
614
615        match request {
616            Request::List {
617                entity, filters, ..
618            } => {
619                assert_eq!(entity, "users");
620                assert_eq!(filters.len(), 1);
621                assert_eq!(filters[0].field, "age");
622            }
623            _ => panic!("expected List request"),
624        }
625    }
626
627    #[test]
628    fn test_read_without_id_fails() {
629        let op = DbOperation {
630            entity: "users".to_string(),
631            operation: DbOp::Read,
632            id: None,
633        };
634        let result = build_request(op, &[]);
635        assert!(result.is_err());
636    }
637
638    #[test]
639    fn test_parse_admin_topic_health() {
640        let op = parse_admin_topic("$DB/_health").unwrap();
641        assert!(matches!(op, AdminOperation::Health));
642    }
643
644    #[test]
645    fn test_parse_admin_topic_catalog() {
646        let op = parse_admin_topic("$DB/_admin/catalog").unwrap();
647        assert!(matches!(op, AdminOperation::Catalog));
648    }
649
650    #[test]
651    fn test_parse_vault_topics() {
652        assert!(matches!(
653            parse_admin_topic("$DB/_vault/enable"),
654            Some(AdminOperation::VaultEnable)
655        ));
656        assert!(matches!(
657            parse_admin_topic("$DB/_vault/unlock"),
658            Some(AdminOperation::VaultUnlock)
659        ));
660        assert!(matches!(
661            parse_admin_topic("$DB/_vault/lock"),
662            Some(AdminOperation::VaultLock)
663        ));
664        assert!(matches!(
665            parse_admin_topic("$DB/_vault/disable"),
666            Some(AdminOperation::VaultDisable)
667        ));
668        assert!(matches!(
669            parse_admin_topic("$DB/_vault/change"),
670            Some(AdminOperation::VaultChange)
671        ));
672        assert!(matches!(
673            parse_admin_topic("$DB/_vault/status"),
674            Some(AdminOperation::VaultStatus)
675        ));
676        assert!(parse_admin_topic("$DB/_vault/unknown").is_none());
677    }
678
679    #[test]
680    fn test_parse_auth_topics() {
681        assert!(matches!(
682            parse_admin_topic("$DB/_auth/password/change"),
683            Some(AdminOperation::PasswordChange)
684        ));
685        assert!(matches!(
686            parse_admin_topic("$DB/_auth/password/reset/start"),
687            Some(AdminOperation::PasswordResetStart)
688        ));
689        assert!(matches!(
690            parse_admin_topic("$DB/_auth/password/reset/submit"),
691            Some(AdminOperation::PasswordResetSubmit)
692        ));
693        assert!(parse_admin_topic("$DB/_auth/unknown").is_none());
694        assert!(parse_admin_topic("$DB/_auth/password/reset/other").is_none());
695    }
696
697    #[test]
698    fn entity_name_validation() {
699        assert!(parse_db_topic("$DB/users/create").is_some());
700        assert!(parse_db_topic("$DB/my-entity/create").is_some());
701        assert!(parse_db_topic("$DB/my_entity/create").is_some());
702        assert!(parse_db_topic("$DB/Entity123/create").is_some());
703        assert!(parse_db_topic("$DB//create").is_none());
704        assert!(parse_db_topic("$DB/has space/create").is_none());
705        assert!(parse_db_topic("$DB/has.dot/create").is_none());
706        assert!(parse_db_topic(&format!("$DB/{}/create", "a".repeat(129))).is_none());
707        assert!(parse_db_topic(&format!("$DB/{}/create", "a".repeat(128))).is_some());
708    }
709
710    #[test]
711    fn record_id_validation() {
712        assert!(parse_db_topic("$DB/users/valid-id").is_some());
713        assert!(parse_db_topic("$DB/users/abc123").is_some());
714        assert!(parse_db_topic("$DB/users/+").is_none());
715        assert!(parse_db_topic("$DB/users/#").is_none());
716        let long_id = "x".repeat(513);
717        assert!(parse_db_topic(&format!("$DB/users/{long_id}")).is_none());
718        let ok_id = "x".repeat(512);
719        assert!(parse_db_topic(&format!("$DB/users/{ok_id}")).is_some());
720    }
721
722    #[test]
723    fn admin_entity_name_validation() {
724        assert!(parse_admin_topic("$DB/_admin/schema/users/set").is_some());
725        assert!(parse_admin_topic("$DB/_admin/schema/has space/set").is_none());
726        assert!(parse_admin_topic("$DB/_admin/constraint/has.dot/add").is_none());
727        assert!(parse_admin_topic(&format!("$DB/_admin/index/{}/add", "a".repeat(129))).is_none());
728    }
729
730    #[test]
731    fn payload_too_large_rejected() {
732        let op = DbOperation {
733            entity: "users".to_string(),
734            operation: DbOp::Create,
735            id: None,
736        };
737        let big_payload = vec![b'{'; MAX_PAYLOAD_SIZE + 1];
738        let result = build_request(op, &big_payload);
739        assert!(matches!(result, Err(ProtocolError::PayloadTooLarge(_))));
740    }
741
742    #[test]
743    fn test_extract_list_options_with_eq_filter() {
744        let payload = serde_json::json!({
745            "filters": [{"field": "email", "op": "eq", "value": "alice@example.com"}]
746        });
747        let (filters, sort, pagination, includes, projection) = extract_list_options(&payload);
748        assert_eq!(filters.len(), 1);
749        assert_eq!(filters[0].field, "email");
750        assert!(matches!(filters[0].op, crate::FilterOp::Eq));
751        assert_eq!(filters[0].value, serde_json::json!("alice@example.com"));
752        assert!(sort.is_empty());
753        assert!(pagination.is_none());
754        assert!(includes.is_empty());
755        assert!(projection.is_none());
756    }
757}