1use std::fmt;
5
6use crate::transport::Request;
7use crate::types::{Filter, Pagination, SortOrder};
8use serde_json::Value;
9
10#[derive(Debug, thiserror::Error)]
11pub enum ProtocolError {
12 #[error("missing required ID for {0} operation")]
13 MissingId(DbOp),
14 #[error("invalid JSON payload: {0}")]
15 InvalidPayload(#[from] serde_json::Error),
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum DbOp {
20 Create,
21 Read,
22 Update,
23 Delete,
24 List,
25}
26
27impl fmt::Display for DbOp {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 Self::Create => f.write_str("create"),
31 Self::Read => f.write_str("read"),
32 Self::Update => f.write_str("update"),
33 Self::Delete => f.write_str("delete"),
34 Self::List => f.write_str("list"),
35 }
36 }
37}
38
39#[derive(Debug, Clone)]
40pub struct DbOperation {
41 pub entity: String,
42 pub operation: DbOp,
43 pub id: Option<String>,
44}
45
46#[derive(Debug, Clone)]
47pub enum AdminOperation {
48 SchemaSet { entity: String },
49 SchemaGet { entity: String },
50 ConstraintAdd { entity: String },
51 ConstraintList { entity: String },
52 Backup,
53 Restore,
54 BackupList,
55 Subscribe,
56 Heartbeat { sub_id: String },
57 Unsubscribe { sub_id: String },
58 ConsumerGroupList,
59 ConsumerGroupShow { name: String },
60 Health,
61 UserAdd,
62 UserDelete,
63 UserList,
64 AclRuleAdd,
65 AclRuleRemove,
66 AclRuleList,
67 AclRoleAdd,
68 AclRoleDelete,
69 AclRoleList,
70 AclAssignmentAssign,
71 AclAssignmentUnassign,
72 AclAssignmentList,
73 IndexAdd { entity: String },
74 Catalog,
75}
76
77type ListOptions = (
78 Vec<Filter>,
79 Vec<SortOrder>,
80 Option<Pagination>,
81 Vec<String>,
82 Option<Vec<String>>,
83);
84
85#[allow(clippy::must_use_candidate)]
86pub fn parse_admin_topic(topic: &str) -> Option<AdminOperation> {
87 if topic == "$DB/_health" {
88 return Some(AdminOperation::Health);
89 }
90
91 if let Some(rest) = topic.strip_prefix("$DB/_sub/") {
92 let parts: Vec<&str> = rest.split('/').collect();
93 return match parts.as_slice() {
94 ["subscribe"] => Some(AdminOperation::Subscribe),
95 [id, "heartbeat"] => Some(AdminOperation::Heartbeat {
96 sub_id: (*id).to_string(),
97 }),
98 [id, "unsubscribe"] => Some(AdminOperation::Unsubscribe {
99 sub_id: (*id).to_string(),
100 }),
101 _ => None,
102 };
103 }
104
105 let parts: Vec<&str> = topic.strip_prefix("$DB/_admin/")?.split('/').collect();
106
107 match parts.as_slice() {
108 ["schema", entity, "set"] => Some(AdminOperation::SchemaSet {
109 entity: (*entity).to_string(),
110 }),
111 ["schema", entity, "get"] => Some(AdminOperation::SchemaGet {
112 entity: (*entity).to_string(),
113 }),
114 ["constraint", entity, "add"] => Some(AdminOperation::ConstraintAdd {
115 entity: (*entity).to_string(),
116 }),
117 ["constraint", entity, "list"] => Some(AdminOperation::ConstraintList {
118 entity: (*entity).to_string(),
119 }),
120 ["index", entity, "add"] => Some(AdminOperation::IndexAdd {
121 entity: (*entity).to_string(),
122 }),
123 ["backup"] => Some(AdminOperation::Backup),
124 ["backup", "list"] => Some(AdminOperation::BackupList),
125 ["restore"] => Some(AdminOperation::Restore),
126 ["consumer-groups"] => Some(AdminOperation::ConsumerGroupList),
127 ["consumer-groups", name] => Some(AdminOperation::ConsumerGroupShow {
128 name: (*name).to_string(),
129 }),
130 ["users", "add"] => Some(AdminOperation::UserAdd),
131 ["users", "delete"] => Some(AdminOperation::UserDelete),
132 ["users", "list"] => Some(AdminOperation::UserList),
133 ["acl", "rules", "add"] => Some(AdminOperation::AclRuleAdd),
134 ["acl", "rules", "remove"] => Some(AdminOperation::AclRuleRemove),
135 ["acl", "rules", "list"] => Some(AdminOperation::AclRuleList),
136 ["acl", "roles", "add"] => Some(AdminOperation::AclRoleAdd),
137 ["acl", "roles", "delete"] => Some(AdminOperation::AclRoleDelete),
138 ["acl", "roles", "list"] => Some(AdminOperation::AclRoleList),
139 ["acl", "assignments", "assign"] => Some(AdminOperation::AclAssignmentAssign),
140 ["acl", "assignments", "unassign"] => Some(AdminOperation::AclAssignmentUnassign),
141 ["acl", "assignments", "list"] => Some(AdminOperation::AclAssignmentList),
142 ["catalog"] => Some(AdminOperation::Catalog),
143 _ => None,
144 }
145}
146
147#[allow(clippy::must_use_candidate)]
148pub fn parse_db_topic(topic: &str) -> Option<DbOperation> {
149 let parts: Vec<&str> = topic.strip_prefix("$DB/")?.split('/').collect();
150
151 match parts.as_slice() {
152 [entity, "create"] => Some(DbOperation {
153 entity: (*entity).to_string(),
154 operation: DbOp::Create,
155 id: None,
156 }),
157 [entity, "list"] => Some(DbOperation {
158 entity: (*entity).to_string(),
159 operation: DbOp::List,
160 id: None,
161 }),
162 [entity, id] => Some(DbOperation {
163 entity: (*entity).to_string(),
164 operation: DbOp::Read,
165 id: Some((*id).to_string()),
166 }),
167 [entity, id, "update"] => Some(DbOperation {
168 entity: (*entity).to_string(),
169 operation: DbOp::Update,
170 id: Some((*id).to_string()),
171 }),
172 [entity, id, "delete"] => Some(DbOperation {
173 entity: (*entity).to_string(),
174 operation: DbOp::Delete,
175 id: Some((*id).to_string()),
176 }),
177 _ => None,
178 }
179}
180
181pub fn build_request(op: DbOperation, payload: &[u8]) -> Result<Request, ProtocolError> {
186 let data: Value = if payload.is_empty() {
187 Value::Null
188 } else {
189 serde_json::from_slice(payload)?
190 };
191
192 match op.operation {
193 DbOp::Create => Ok(Request::Create {
194 entity: op.entity,
195 data,
196 }),
197 DbOp::Read => {
198 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Read))?;
199 let (includes, projection) = extract_read_options(&data);
200 Ok(Request::Read {
201 entity: op.entity,
202 id,
203 includes,
204 projection,
205 })
206 }
207 DbOp::Update => {
208 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Update))?;
209 Ok(Request::Update {
210 entity: op.entity,
211 id,
212 fields: data,
213 })
214 }
215 DbOp::Delete => {
216 let id = op.id.ok_or(ProtocolError::MissingId(DbOp::Delete))?;
217 Ok(Request::Delete {
218 entity: op.entity,
219 id,
220 })
221 }
222 DbOp::List => {
223 let (filters, sort, pagination, includes, projection) = extract_list_options(&data);
224 Ok(Request::List {
225 entity: op.entity,
226 filters,
227 sort,
228 pagination,
229 includes,
230 projection,
231 })
232 }
233 }
234}
235
236fn extract_read_options(data: &Value) -> (Vec<String>, Option<Vec<String>>) {
237 let includes = data
238 .get("includes")
239 .and_then(|v| v.as_array())
240 .map(|arr| {
241 arr.iter()
242 .filter_map(|v| v.as_str().map(String::from))
243 .collect()
244 })
245 .unwrap_or_default();
246
247 let projection = data
248 .get("projection")
249 .and_then(|v| v.as_array())
250 .map(|arr| {
251 arr.iter()
252 .filter_map(|v| v.as_str().map(String::from))
253 .collect()
254 });
255
256 (includes, projection)
257}
258
259#[allow(clippy::cast_possible_truncation)]
260fn extract_list_options(data: &Value) -> ListOptions {
261 let filters: Vec<Filter> = data
262 .get("filters")
263 .and_then(|v| serde_json::from_value(v.clone()).ok())
264 .unwrap_or_default();
265
266 let sort: Vec<SortOrder> = data
267 .get("sort")
268 .and_then(|v| serde_json::from_value(v.clone()).ok())
269 .unwrap_or_default();
270
271 let pagination: Option<Pagination> = data
272 .get("pagination")
273 .and_then(|v| serde_json::from_value(v.clone()).ok())
274 .or_else(|| {
275 let limit = data
276 .get("limit")
277 .and_then(serde_json::Value::as_u64)
278 .map(|v| v as usize);
279 let offset = data
280 .get("offset")
281 .and_then(serde_json::Value::as_u64)
282 .map(|v| v as usize);
283 match (limit, offset) {
284 (Some(l), Some(o)) => Some(Pagination::new(l, o)),
285 (Some(l), None) => Some(Pagination::new(l, 0)),
286 (None, Some(o)) => Some(Pagination::new(usize::MAX, o)),
287 (None, None) => None,
288 }
289 });
290
291 let includes = data
292 .get("includes")
293 .and_then(|v| v.as_array())
294 .map(|arr| {
295 arr.iter()
296 .filter_map(|v| v.as_str().map(String::from))
297 .collect()
298 })
299 .unwrap_or_default();
300
301 let projection = data
302 .get("projection")
303 .and_then(|v| v.as_array())
304 .map(|arr| {
305 arr.iter()
306 .filter_map(|v| v.as_str().map(String::from))
307 .collect()
308 });
309
310 (filters, sort, pagination, includes, projection)
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_parse_db_topic_create() {
319 let op = parse_db_topic("$DB/users/create").unwrap();
320 assert_eq!(op.entity, "users");
321 assert_eq!(op.operation, DbOp::Create);
322 assert!(op.id.is_none());
323 }
324
325 #[test]
326 fn test_parse_db_topic_read() {
327 let op = parse_db_topic("$DB/users/123").unwrap();
328 assert_eq!(op.entity, "users");
329 assert_eq!(op.operation, DbOp::Read);
330 assert_eq!(op.id, Some("123".to_string()));
331 }
332
333 #[test]
334 fn test_parse_db_topic_update() {
335 let op = parse_db_topic("$DB/users/123/update").unwrap();
336 assert_eq!(op.entity, "users");
337 assert_eq!(op.operation, DbOp::Update);
338 assert_eq!(op.id, Some("123".to_string()));
339 }
340
341 #[test]
342 fn test_parse_db_topic_delete() {
343 let op = parse_db_topic("$DB/users/123/delete").unwrap();
344 assert_eq!(op.entity, "users");
345 assert_eq!(op.operation, DbOp::Delete);
346 assert_eq!(op.id, Some("123".to_string()));
347 }
348
349 #[test]
350 fn test_parse_db_topic_list() {
351 let op = parse_db_topic("$DB/users/list").unwrap();
352 assert_eq!(op.entity, "users");
353 assert_eq!(op.operation, DbOp::List);
354 assert!(op.id.is_none());
355 }
356
357 #[test]
358 fn test_parse_db_topic_invalid() {
359 assert!(parse_db_topic("invalid/topic").is_none());
360 assert!(parse_db_topic("$DB").is_none());
361 assert!(parse_db_topic("$DB/").is_none());
362 }
363
364 #[test]
365 fn test_build_create_request() {
366 let op = DbOperation {
367 entity: "users".to_string(),
368 operation: DbOp::Create,
369 id: None,
370 };
371 let payload = br#"{"name": "Alice"}"#;
372 let request = build_request(op, payload).unwrap();
373
374 match request {
375 Request::Create { entity, data } => {
376 assert_eq!(entity, "users");
377 assert_eq!(data["name"], "Alice");
378 }
379 _ => panic!("expected Create request"),
380 }
381 }
382
383 #[test]
384 fn test_build_read_request() {
385 let op = DbOperation {
386 entity: "users".to_string(),
387 operation: DbOp::Read,
388 id: Some("123".to_string()),
389 };
390 let payload = br#"{"projection": ["name", "email"]}"#;
391 let request = build_request(op, payload).unwrap();
392
393 match request {
394 Request::Read {
395 entity,
396 id,
397 projection,
398 ..
399 } => {
400 assert_eq!(entity, "users");
401 assert_eq!(id, "123");
402 assert_eq!(
403 projection,
404 Some(vec!["name".to_string(), "email".to_string()])
405 );
406 }
407 _ => panic!("expected Read request"),
408 }
409 }
410
411 #[test]
412 fn test_build_list_request() {
413 let op = DbOperation {
414 entity: "users".to_string(),
415 operation: DbOp::List,
416 id: None,
417 };
418 let payload = br#"{"filters": [{"field": "age", "op": "gt", "value": 18}]}"#;
419 let request = build_request(op, payload).unwrap();
420
421 match request {
422 Request::List {
423 entity, filters, ..
424 } => {
425 assert_eq!(entity, "users");
426 assert_eq!(filters.len(), 1);
427 assert_eq!(filters[0].field, "age");
428 }
429 _ => panic!("expected List request"),
430 }
431 }
432
433 #[test]
434 fn test_read_without_id_fails() {
435 let op = DbOperation {
436 entity: "users".to_string(),
437 operation: DbOp::Read,
438 id: None,
439 };
440 let result = build_request(op, &[]);
441 assert!(result.is_err());
442 }
443
444 #[test]
445 fn test_parse_admin_topic_health() {
446 let op = parse_admin_topic("$DB/_health").unwrap();
447 assert!(matches!(op, AdminOperation::Health));
448 }
449
450 #[test]
451 fn test_parse_admin_topic_catalog() {
452 let op = parse_admin_topic("$DB/_admin/catalog").unwrap();
453 assert!(matches!(op, AdminOperation::Catalog));
454 }
455
456 #[test]
457 fn test_extract_list_options_with_eq_filter() {
458 let payload = serde_json::json!({
459 "filters": [{"field": "email", "op": "eq", "value": "alice@example.com"}]
460 });
461 let (filters, sort, pagination, includes, projection) = extract_list_options(&payload);
462 assert_eq!(filters.len(), 1);
463 assert_eq!(filters[0].field, "email");
464 assert!(matches!(filters[0].op, crate::FilterOp::Eq));
465 assert_eq!(filters[0].value, serde_json::json!("alice@example.com"));
466 assert!(sort.is_empty());
467 assert!(pagination.is_none());
468 assert!(includes.is_empty());
469 assert!(projection.is_none());
470 }
471}