Skip to main content

mqdb_core/protocol/
mod.rs

1// Copyright 2025-2026 LabOverWire. All rights reserved.
2// SPDX-License-Identifier: AGPL-3.0-only
3
4use std::fmt;
5
6use crate::transport::Request;
7use crate::types::{Filter, Pagination, SortOrder};
8use serde_json::Value;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ProtocolError {
12    #[error("missing required ID for {0} operation")]
13    MissingId(DbOp),
14    #[error("invalid JSON payload: {0}")]
15    InvalidPayload(#[from] serde_json::Error),
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum DbOp {
20    Create,
21    Read,
22    Update,
23    Delete,
24    List,
25}
26
27impl fmt::Display for DbOp {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        match self {
30            Self::Create => f.write_str("create"),
31            Self::Read => f.write_str("read"),
32            Self::Update => f.write_str("update"),
33            Self::Delete => f.write_str("delete"),
34            Self::List => f.write_str("list"),
35        }
36    }
37}
38
39#[derive(Debug, Clone)]
40pub struct DbOperation {
41    pub entity: String,
42    pub operation: DbOp,
43    pub id: Option<String>,
44}
45
46#[derive(Debug, Clone)]
47pub enum AdminOperation {
48    SchemaSet { entity: String },
49    SchemaGet { entity: String },
50    ConstraintAdd { entity: String },
51    ConstraintList { entity: String },
52    Backup,
53    Restore,
54    BackupList,
55    Subscribe,
56    Heartbeat { sub_id: String },
57    Unsubscribe { sub_id: String },
58    ConsumerGroupList,
59    ConsumerGroupShow { name: String },
60    Health,
61    UserAdd,
62    UserDelete,
63    UserList,
64    AclRuleAdd,
65    AclRuleRemove,
66    AclRuleList,
67    AclRoleAdd,
68    AclRoleDelete,
69    AclRoleList,
70    AclAssignmentAssign,
71    AclAssignmentUnassign,
72    AclAssignmentList,
73    IndexAdd { entity: String },
74    Catalog,
75}
76
77type ListOptions = (
78    Vec<Filter>,
79    Vec<SortOrder>,
80    Option<Pagination>,
81    Vec<String>,
82    Option<Vec<String>>,
83);
84
85#[allow(clippy::must_use_candidate)]
86pub fn parse_admin_topic(topic: &str) -> Option<AdminOperation> {
87    if topic == "$DB/_health" {
88        return Some(AdminOperation::Health);
89    }
90
91    if let Some(rest) = topic.strip_prefix("$DB/_sub/") {
92        let parts: Vec<&str> = rest.split('/').collect();
93        return match parts.as_slice() {
94            ["subscribe"] => Some(AdminOperation::Subscribe),
95            [id, "heartbeat"] => Some(AdminOperation::Heartbeat {
96                sub_id: (*id).to_string(),
97            }),
98            [id, "unsubscribe"] => Some(AdminOperation::Unsubscribe {
99                sub_id: (*id).to_string(),
100            }),
101            _ => None,
102        };
103    }
104
105    let parts: Vec<&str> = topic.strip_prefix("$DB/_admin/")?.split('/').collect();
106
107    match parts.as_slice() {
108        ["schema", entity, "set"] => Some(AdminOperation::SchemaSet {
109            entity: (*entity).to_string(),
110        }),
111        ["schema", entity, "get"] => Some(AdminOperation::SchemaGet {
112            entity: (*entity).to_string(),
113        }),
114        ["constraint", entity, "add"] => Some(AdminOperation::ConstraintAdd {
115            entity: (*entity).to_string(),
116        }),
117        ["constraint", entity, "list"] => Some(AdminOperation::ConstraintList {
118            entity: (*entity).to_string(),
119        }),
120        ["index", entity, "add"] => Some(AdminOperation::IndexAdd {
121            entity: (*entity).to_string(),
122        }),
123        ["backup"] => Some(AdminOperation::Backup),
124        ["backup", "list"] => Some(AdminOperation::BackupList),
125        ["restore"] => Some(AdminOperation::Restore),
126        ["consumer-groups"] => Some(AdminOperation::ConsumerGroupList),
127        ["consumer-groups", name] => Some(AdminOperation::ConsumerGroupShow {
128            name: (*name).to_string(),
129        }),
130        ["users", "add"] => Some(AdminOperation::UserAdd),
131        ["users", "delete"] => Some(AdminOperation::UserDelete),
132        ["users", "list"] => Some(AdminOperation::UserList),
133        ["acl", "rules", "add"] => Some(AdminOperation::AclRuleAdd),
134        ["acl", "rules", "remove"] => Some(AdminOperation::AclRuleRemove),
135        ["acl", "rules", "list"] => Some(AdminOperation::AclRuleList),
136        ["acl", "roles", "add"] => Some(AdminOperation::AclRoleAdd),
137        ["acl", "roles", "delete"] => Some(AdminOperation::AclRoleDelete),
138        ["acl", "roles", "list"] => Some(AdminOperation::AclRoleList),
139        ["acl", "assignments", "assign"] => Some(AdminOperation::AclAssignmentAssign),
140        ["acl", "assignments", "unassign"] => Some(AdminOperation::AclAssignmentUnassign),
141        ["acl", "assignments", "list"] => Some(AdminOperation::AclAssignmentList),
142        ["catalog"] => Some(AdminOperation::Catalog),
143        _ => None,
144    }
145}
146
147#[allow(clippy::must_use_candidate)]
148pub fn parse_db_topic(topic: &str) -> Option<DbOperation> {
149    let parts: Vec<&str> = topic.strip_prefix("$DB/")?.split('/').collect();
150
151    match parts.as_slice() {
152        [entity, "create"] => Some(DbOperation {
153            entity: (*entity).to_string(),
154            operation: DbOp::Create,
155            id: None,
156        }),
157        [entity, "list"] => Some(DbOperation {
158            entity: (*entity).to_string(),
159            operation: DbOp::List,
160            id: None,
161        }),
162        [entity, id] => Some(DbOperation {
163            entity: (*entity).to_string(),
164            operation: DbOp::Read,
165            id: Some((*id).to_string()),
166        }),
167        [entity, id, "update"] => Some(DbOperation {
168            entity: (*entity).to_string(),
169            operation: DbOp::Update,
170            id: Some((*id).to_string()),
171        }),
172        [entity, id, "delete"] => Some(DbOperation {
173            entity: (*entity).to_string(),
174            operation: DbOp::Delete,
175            id: Some((*id).to_string()),
176        }),
177        _ => None,
178    }
179}
180
181/// Builds a database request from an operation descriptor and payload.
182///
183/// # Errors
184/// Returns an error if JSON deserialization fails or a required ID is missing.
185pub fn build_request(op: DbOperation, payload: &[u8]) -> Result<Request, ProtocolError> {
186    let data: Value = if payload.is_empty() {
187        Value::Null
188    } else {
189        serde_json::from_slice(payload)?
190    };
191
192    match op.operation {
193        DbOp::Create => Ok(Request::Create {
194            entity: op.entity,
195            data,
196        }),
197        DbOp::Read => {
198            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Read))?;
199            let (includes, projection) = extract_read_options(&data);
200            Ok(Request::Read {
201                entity: op.entity,
202                id,
203                includes,
204                projection,
205            })
206        }
207        DbOp::Update => {
208            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Update))?;
209            Ok(Request::Update {
210                entity: op.entity,
211                id,
212                fields: data,
213            })
214        }
215        DbOp::Delete => {
216            let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Delete))?;
217            Ok(Request::Delete {
218                entity: op.entity,
219                id,
220            })
221        }
222        DbOp::List => {
223            let (filters, sort, pagination, includes, projection) = extract_list_options(&data);
224            Ok(Request::List {
225                entity: op.entity,
226                filters,
227                sort,
228                pagination,
229                includes,
230                projection,
231            })
232        }
233    }
234}
235
236fn extract_read_options(data: &Value) -> (Vec<String>, Option<Vec<String>>) {
237    let includes = data
238        .get("includes")
239        .and_then(|v| v.as_array())
240        .map(|arr| {
241            arr.iter()
242                .filter_map(|v| v.as_str().map(String::from))
243                .collect()
244        })
245        .unwrap_or_default();
246
247    let projection = data
248        .get("projection")
249        .and_then(|v| v.as_array())
250        .map(|arr| {
251            arr.iter()
252                .filter_map(|v| v.as_str().map(String::from))
253                .collect()
254        });
255
256    (includes, projection)
257}
258
259#[allow(clippy::cast_possible_truncation)]
260fn extract_list_options(data: &Value) -> ListOptions {
261    let filters: Vec<Filter> = data
262        .get("filters")
263        .and_then(|v| serde_json::from_value(v.clone()).ok())
264        .unwrap_or_default();
265
266    let sort: Vec<SortOrder> = data
267        .get("sort")
268        .and_then(|v| serde_json::from_value(v.clone()).ok())
269        .unwrap_or_default();
270
271    let pagination: Option<Pagination> = data
272        .get("pagination")
273        .and_then(|v| serde_json::from_value(v.clone()).ok())
274        .or_else(|| {
275            let limit = data
276                .get("limit")
277                .and_then(serde_json::Value::as_u64)
278                .map(|v| v as usize);
279            let offset = data
280                .get("offset")
281                .and_then(serde_json::Value::as_u64)
282                .map(|v| v as usize);
283            match (limit, offset) {
284                (Some(l), Some(o)) => Some(Pagination::new(l, o)),
285                (Some(l), None) => Some(Pagination::new(l, 0)),
286                (None, Some(o)) => Some(Pagination::new(usize::MAX, o)),
287                (None, None) => None,
288            }
289        });
290
291    let includes = data
292        .get("includes")
293        .and_then(|v| v.as_array())
294        .map(|arr| {
295            arr.iter()
296                .filter_map(|v| v.as_str().map(String::from))
297                .collect()
298        })
299        .unwrap_or_default();
300
301    let projection = data
302        .get("projection")
303        .and_then(|v| v.as_array())
304        .map(|arr| {
305            arr.iter()
306                .filter_map(|v| v.as_str().map(String::from))
307                .collect()
308        });
309
310    (filters, sort, pagination, includes, projection)
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn test_parse_db_topic_create() {
319        let op = parse_db_topic("$DB/users/create").unwrap();
320        assert_eq!(op.entity, "users");
321        assert_eq!(op.operation, DbOp::Create);
322        assert!(op.id.is_none());
323    }
324
325    #[test]
326    fn test_parse_db_topic_read() {
327        let op = parse_db_topic("$DB/users/123").unwrap();
328        assert_eq!(op.entity, "users");
329        assert_eq!(op.operation, DbOp::Read);
330        assert_eq!(op.id, Some("123".to_string()));
331    }
332
333    #[test]
334    fn test_parse_db_topic_update() {
335        let op = parse_db_topic("$DB/users/123/update").unwrap();
336        assert_eq!(op.entity, "users");
337        assert_eq!(op.operation, DbOp::Update);
338        assert_eq!(op.id, Some("123".to_string()));
339    }
340
341    #[test]
342    fn test_parse_db_topic_delete() {
343        let op = parse_db_topic("$DB/users/123/delete").unwrap();
344        assert_eq!(op.entity, "users");
345        assert_eq!(op.operation, DbOp::Delete);
346        assert_eq!(op.id, Some("123".to_string()));
347    }
348
349    #[test]
350    fn test_parse_db_topic_list() {
351        let op = parse_db_topic("$DB/users/list").unwrap();
352        assert_eq!(op.entity, "users");
353        assert_eq!(op.operation, DbOp::List);
354        assert!(op.id.is_none());
355    }
356
357    #[test]
358    fn test_parse_db_topic_invalid() {
359        assert!(parse_db_topic("invalid/topic").is_none());
360        assert!(parse_db_topic("$DB").is_none());
361        assert!(parse_db_topic("$DB/").is_none());
362    }
363
364    #[test]
365    fn test_build_create_request() {
366        let op = DbOperation {
367            entity: "users".to_string(),
368            operation: DbOp::Create,
369            id: None,
370        };
371        let payload = br#"{"name": "Alice"}"#;
372        let request = build_request(op, payload).unwrap();
373
374        match request {
375            Request::Create { entity, data } => {
376                assert_eq!(entity, "users");
377                assert_eq!(data["name"], "Alice");
378            }
379            _ => panic!("expected Create request"),
380        }
381    }
382
383    #[test]
384    fn test_build_read_request() {
385        let op = DbOperation {
386            entity: "users".to_string(),
387            operation: DbOp::Read,
388            id: Some("123".to_string()),
389        };
390        let payload = br#"{"projection": ["name", "email"]}"#;
391        let request = build_request(op, payload).unwrap();
392
393        match request {
394            Request::Read {
395                entity,
396                id,
397                projection,
398                ..
399            } => {
400                assert_eq!(entity, "users");
401                assert_eq!(id, "123");
402                assert_eq!(
403                    projection,
404                    Some(vec!["name".to_string(), "email".to_string()])
405                );
406            }
407            _ => panic!("expected Read request"),
408        }
409    }
410
411    #[test]
412    fn test_build_list_request() {
413        let op = DbOperation {
414            entity: "users".to_string(),
415            operation: DbOp::List,
416            id: None,
417        };
418        let payload = br#"{"filters": [{"field": "age", "op": "gt", "value": 18}]}"#;
419        let request = build_request(op, payload).unwrap();
420
421        match request {
422            Request::List {
423                entity, filters, ..
424            } => {
425                assert_eq!(entity, "users");
426                assert_eq!(filters.len(), 1);
427                assert_eq!(filters[0].field, "age");
428            }
429            _ => panic!("expected List request"),
430        }
431    }
432
433    #[test]
434    fn test_read_without_id_fails() {
435        let op = DbOperation {
436            entity: "users".to_string(),
437            operation: DbOp::Read,
438            id: None,
439        };
440        let result = build_request(op, &[]);
441        assert!(result.is_err());
442    }
443
444    #[test]
445    fn test_parse_admin_topic_health() {
446        let op = parse_admin_topic("$DB/_health").unwrap();
447        assert!(matches!(op, AdminOperation::Health));
448    }
449
450    #[test]
451    fn test_parse_admin_topic_catalog() {
452        let op = parse_admin_topic("$DB/_admin/catalog").unwrap();
453        assert!(matches!(op, AdminOperation::Catalog));
454    }
455
456    #[test]
457    fn test_extract_list_options_with_eq_filter() {
458        let payload = serde_json::json!({
459            "filters": [{"field": "email", "op": "eq", "value": "alice@example.com"}]
460        });
461        let (filters, sort, pagination, includes, projection) = extract_list_options(&payload);
462        assert_eq!(filters.len(), 1);
463        assert_eq!(filters[0].field, "email");
464        assert!(matches!(filters[0].op, crate::FilterOp::Eq));
465        assert_eq!(filters[0].value, serde_json::json!("alice@example.com"));
466        assert!(sort.is_empty());
467        assert!(pagination.is_none());
468        assert!(includes.is_empty());
469        assert!(projection.is_none());
470    }
471}