1use serde_json::{Number, Value as JsonValue};
8use std::collections::HashMap;
9use tracing::{debug, trace, warn};
10
11#[derive(Debug, Clone, PartialEq)]
13pub enum WireMessage {
14 Push(WireExpression),
16
17 Pull(i64),
19
20 Resolve(i64, WireExpression),
22
23 Reject(i64, WireExpression),
25
26 Release(Vec<i64>),
28
29 Abort(WireExpression),
31}
32
33#[derive(Debug, Clone, PartialEq)]
35pub enum WireExpression {
36 Null,
38 Bool(bool),
39 Number(Number),
40 String(String),
41 Array(Vec<WireExpression>),
42 Object(HashMap<String, WireExpression>),
43
44 Error {
47 error_type: String,
48 message: String,
49 stack: Option<String>,
50 },
51
52 Import(i64),
54
55 Export {
57 id: i64,
58 is_promise: bool,
59 },
60
61 Promise(i64),
63
64 Pipeline {
66 import_id: i64,
67 property_path: Option<Vec<PropertyKey>>,
68 args: Option<Box<WireExpression>>,
69 },
70
71 Call {
73 cap_id: i64,
74 property_path: Vec<PropertyKey>,
75 args: Box<WireExpression>,
76 },
77
78 Date(f64),
80
81 Remap(JsonValue), CapRef(i64),
86}
87
88#[derive(Debug, Clone, PartialEq)]
89pub enum PropertyKey {
90 String(String),
91 Number(usize),
92}
93
94impl WireMessage {
95 pub fn from_json_array(arr: &[JsonValue]) -> Result<Self, String> {
97 trace!(
98 "Parsing wire message from JSON array with {} elements",
99 arr.len()
100 );
101
102 if arr.is_empty() {
103 warn!("Attempted to parse empty message array");
104 return Err("Empty message array".into());
105 }
106
107 let msg_type = arr[0].as_str().ok_or_else(|| {
108 warn!("Message type is not a string: {:?}", arr[0]);
109 "Message type must be a string".to_string()
110 })?;
111
112 debug!("Parsing message type: {}", msg_type);
113
114 match msg_type {
115 "push" => {
116 if arr.len() != 2 {
117 warn!("push message has {} elements, expected 2", arr.len());
118 return Err("push requires exactly 2 elements".into());
119 }
120 trace!("Parsing push expression: {:?}", arr[1]);
121 let expr = WireExpression::from_json(&arr[1])?;
122 Ok(WireMessage::Push(expr))
123 }
124
125 "pull" => {
126 if arr.len() != 2 {
127 warn!("pull message has {} elements, expected 2", arr.len());
128 return Err("pull requires exactly 2 elements".into());
129 }
130 trace!("Parsing pull with import ID: {:?}", arr[1]);
131 let id = arr[1]
132 .as_i64()
133 .ok_or_else(|| "pull requires an integer import ID".to_string())?;
134 Ok(WireMessage::Pull(id))
135 }
136
137 "resolve" => {
138 if arr.len() != 3 {
139 return Err("resolve requires exactly 3 elements".into());
140 }
141 let id = arr[1]
142 .as_i64()
143 .ok_or_else(|| "resolve requires an integer export ID".to_string())?;
144 let value = WireExpression::from_json(&arr[2])?;
145 Ok(WireMessage::Resolve(id, value))
146 }
147
148 "reject" => {
149 if arr.len() != 3 {
150 return Err("reject requires exactly 3 elements".into());
151 }
152 let id = arr[1]
153 .as_i64()
154 .ok_or_else(|| "reject requires an integer export ID".to_string())?;
155 let error = WireExpression::from_json(&arr[2])?;
156 Ok(WireMessage::Reject(id, error))
157 }
158
159 "release" => {
160 if arr.len() != 2 {
161 return Err("release requires exactly 2 elements".into());
162 }
163 let ids = arr[1]
164 .as_array()
165 .ok_or_else(|| "release requires an array of import IDs".to_string())?
166 .iter()
167 .map(|v| {
168 v.as_i64()
169 .ok_or_else(|| "release IDs must be integers".to_string())
170 })
171 .collect::<Result<Vec<_>, _>>()?;
172 Ok(WireMessage::Release(ids))
173 }
174
175 "abort" => {
176 if arr.len() != 2 {
177 return Err("abort requires exactly 2 elements".into());
178 }
179 let error = WireExpression::from_json(&arr[1])?;
180 Ok(WireMessage::Abort(error))
181 }
182
183 _ => {
184 warn!("Unknown message type: {}", msg_type);
185 Err(format!("Unknown message type: {}", msg_type))
186 }
187 }
188 }
189
190 pub fn to_json_array(&self) -> Vec<JsonValue> {
192 match self {
193 WireMessage::Push(expr) => {
194 vec![JsonValue::String("push".into()), expr.to_json()]
195 }
196 WireMessage::Pull(id) => {
197 vec![
198 JsonValue::String("pull".into()),
199 JsonValue::Number(Number::from(*id)),
200 ]
201 }
202 WireMessage::Resolve(id, value) => {
203 vec![
204 JsonValue::String("resolve".into()),
205 JsonValue::Number(Number::from(*id)),
206 value.to_json(),
207 ]
208 }
209 WireMessage::Reject(id, error) => {
210 vec![
211 JsonValue::String("reject".into()),
212 JsonValue::Number(Number::from(*id)),
213 error.to_json(),
214 ]
215 }
216 WireMessage::Release(ids) => {
217 vec![
218 JsonValue::String("release".into()),
219 JsonValue::Array(
220 ids.iter()
221 .map(|id| JsonValue::Number(Number::from(*id)))
222 .collect(),
223 ),
224 ]
225 }
226 WireMessage::Abort(error) => {
227 vec![JsonValue::String("abort".into()), error.to_json()]
228 }
229 }
230 }
231}
232
233impl WireExpression {
234 pub fn from_json(value: &JsonValue) -> Result<Self, String> {
236 trace!("Parsing expression from JSON: {:?}", value);
237
238 match value {
239 JsonValue::Null => Ok(WireExpression::Null),
240 JsonValue::Bool(b) => Ok(WireExpression::Bool(*b)),
241 JsonValue::Number(n) => Ok(WireExpression::Number(n.clone())),
242 JsonValue::String(s) => Ok(WireExpression::String(s.clone())),
243
244 JsonValue::Array(arr) if !arr.is_empty() => {
245 if let Some(JsonValue::String(type_str)) = arr.first() {
247 debug!("Parsing special form: {}", type_str);
248 match type_str.as_str() {
249 "error" => {
250 if arr.len() < 3 || arr.len() > 4 {
251 return Err("error requires 3-4 elements".into());
252 }
253 let error_type = arr[1]
254 .as_str()
255 .ok_or("error type must be string")?
256 .to_string();
257 let message = arr[2]
258 .as_str()
259 .ok_or("error message must be string")?
260 .to_string();
261 let stack = arr.get(3).and_then(|v| v.as_str()).map(|s| s.to_string());
262 Ok(WireExpression::Error {
263 error_type,
264 message,
265 stack,
266 })
267 }
268
269 "import" => {
270 if arr.len() != 2 {
271 return Err("import requires exactly 2 elements".into());
272 }
273 let id = arr[1].as_i64().ok_or("import ID must be integer")?;
274 Ok(WireExpression::Import(id))
275 }
276
277 "export" => {
278 if arr.len() < 2 || arr.len() > 3 {
279 return Err("export requires 2-3 elements".into());
280 }
281 let id = arr[1].as_i64().ok_or("export ID must be integer")?;
282 let is_promise = arr.get(2).and_then(|v| v.as_bool()).unwrap_or(false);
283 Ok(WireExpression::Export { id, is_promise })
284 }
285
286 "promise" => {
287 if arr.len() != 2 {
288 return Err("promise requires exactly 2 elements".into());
289 }
290 let id = arr[1].as_i64().ok_or("promise ID must be integer")?;
291 Ok(WireExpression::Promise(id))
292 }
293
294 "pipeline" => {
295 if arr.len() < 2 || arr.len() > 4 {
296 warn!("pipeline has {} elements, expected 2-4", arr.len());
297 return Err("pipeline requires 2-4 elements".into());
298 }
299 let import_id = arr[1]
300 .as_i64()
301 .ok_or("pipeline import ID must be integer")?;
302
303 trace!("Pipeline: import_id={}, elements={}", import_id, arr.len());
304
305 let property_path = arr
306 .get(2)
307 .and_then(|v| v.as_array())
308 .map(|path| {
309 path.iter()
310 .map(|key| {
311 if let Some(s) = key.as_str() {
312 Ok(PropertyKey::String(s.to_string()))
313 } else if let Some(n) = key.as_u64() {
314 Ok(PropertyKey::Number(n as usize))
315 } else {
316 Err("Property key must be string or number"
317 .to_string())
318 }
319 })
320 .collect::<Result<Vec<_>, _>>()
321 })
322 .transpose()?;
323
324 let args = arr
325 .get(3)
326 .map(WireExpression::from_json)
327 .transpose()?
328 .map(Box::new);
329
330 Ok(WireExpression::Pipeline {
331 import_id,
332 property_path,
333 args,
334 })
335 }
336
337 "call" => {
338 if arr.len() != 4 {
339 warn!("call has {} elements, expected 4", arr.len());
340 return Err("call requires exactly 4 elements".into());
341 }
342 let cap_id = arr[1].as_i64().ok_or("call cap ID must be integer")?;
343
344 trace!("Call: cap_id={}, elements={}", cap_id, arr.len());
345
346 let property_path = arr[2]
347 .as_array()
348 .ok_or("call property path must be array")?
349 .iter()
350 .map(|key| {
351 if let Some(s) = key.as_str() {
352 Ok(PropertyKey::String(s.to_string()))
353 } else if let Some(n) = key.as_u64() {
354 Ok(PropertyKey::Number(n as usize))
355 } else {
356 Err("Property key must be string or number".to_string())
357 }
358 })
359 .collect::<Result<Vec<_>, _>>()?;
360
361 let args = Box::new(WireExpression::from_json(&arr[3])?);
362
363 Ok(WireExpression::Call {
364 cap_id,
365 property_path,
366 args,
367 })
368 }
369
370 "date" => {
371 if arr.len() != 2 {
372 return Err("date requires exactly 2 elements".into());
373 }
374 let timestamp =
375 arr[1].as_f64().ok_or("date timestamp must be number")?;
376 Ok(WireExpression::Date(timestamp))
377 }
378
379 "remap" => {
380 if arr.len() != 2 {
381 return Err("remap requires exactly 2 elements".into());
382 }
383 Ok(WireExpression::Remap(arr[1].clone()))
384 }
385
386 "capref" => {
387 if arr.len() != 2 {
388 return Err("capref requires exactly 2 elements".into());
389 }
390 let id = arr[1].as_i64().ok_or("capref ID must be integer")?;
391 Ok(WireExpression::CapRef(id))
392 }
393
394 _ => {
395 let items = arr
397 .iter()
398 .map(WireExpression::from_json)
399 .collect::<Result<Vec<_>, _>>()?;
400 Ok(WireExpression::Array(items))
401 }
402 }
403 } else {
404 if arr.len() == 1 {
407 if let Some(JsonValue::Array(inner)) = arr.first() {
408 let items = inner
410 .iter()
411 .map(WireExpression::from_json)
412 .collect::<Result<Vec<_>, _>>()?;
413 return Ok(WireExpression::Array(items));
414 }
415 }
416
417 let items = arr
419 .iter()
420 .map(WireExpression::from_json)
421 .collect::<Result<Vec<_>, _>>()?;
422 Ok(WireExpression::Array(items))
423 }
424 }
425
426 JsonValue::Array(_arr) => Ok(WireExpression::Array(vec![])), JsonValue::Object(obj) => {
429 let map = obj
430 .iter()
431 .map(|(k, v)| Ok((k.clone(), WireExpression::from_json(v)?)))
432 .collect::<Result<HashMap<_, _>, String>>()?;
433 Ok(WireExpression::Object(map))
434 }
435 }
436 }
437
438 pub fn to_json(&self) -> JsonValue {
440 match self {
441 WireExpression::Null => JsonValue::Null,
442 WireExpression::Bool(b) => JsonValue::Bool(*b),
443 WireExpression::Number(n) => JsonValue::Number(n.clone()),
444 WireExpression::String(s) => JsonValue::String(s.clone()),
445
446 WireExpression::Array(items) => {
447 let inner_array = items.iter().map(|e| e.to_json()).collect();
450 JsonValue::Array(vec![JsonValue::Array(inner_array)])
451 }
452
453 WireExpression::Object(map) => {
454 JsonValue::Object(map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect())
455 }
456
457 WireExpression::Error {
458 error_type,
459 message,
460 stack,
461 } => {
462 let mut arr = vec![
463 JsonValue::String("error".into()),
464 JsonValue::String(error_type.clone()),
465 JsonValue::String(message.clone()),
466 ];
467 if let Some(s) = stack {
468 arr.push(JsonValue::String(s.clone()));
469 }
470 JsonValue::Array(arr)
471 }
472
473 WireExpression::Import(id) => JsonValue::Array(vec![
474 JsonValue::String("import".into()),
475 JsonValue::Number(Number::from(*id)),
476 ]),
477
478 WireExpression::Export { id, is_promise } => {
479 let mut arr = vec![
480 JsonValue::String("export".into()),
481 JsonValue::Number(Number::from(*id)),
482 ];
483 if *is_promise {
484 arr.push(JsonValue::Bool(true));
485 }
486 JsonValue::Array(arr)
487 }
488
489 WireExpression::Promise(id) => JsonValue::Array(vec![
490 JsonValue::String("promise".into()),
491 JsonValue::Number(Number::from(*id)),
492 ]),
493
494 WireExpression::Pipeline {
495 import_id,
496 property_path,
497 args,
498 } => {
499 let mut arr = vec![
500 JsonValue::String("pipeline".into()),
501 JsonValue::Number(Number::from(*import_id)),
502 ];
503
504 if let Some(path) = property_path {
505 let path_json: Vec<JsonValue> = path
506 .iter()
507 .map(|key| match key {
508 PropertyKey::String(s) => JsonValue::String(s.clone()),
509 PropertyKey::Number(n) => JsonValue::Number(Number::from(*n)),
510 })
511 .collect();
512 arr.push(JsonValue::Array(path_json));
513
514 if let Some(a) = args {
515 arr.push(a.to_json());
516 }
517 } else if let Some(a) = args {
518 arr.push(JsonValue::Array(vec![]));
520 arr.push(a.to_json());
521 }
522
523 JsonValue::Array(arr)
524 }
525
526 WireExpression::Date(timestamp) => JsonValue::Array(vec![
527 JsonValue::String("date".into()),
528 JsonValue::Number(Number::from_f64(*timestamp).unwrap_or_else(|| Number::from(0))), ]),
530
531 WireExpression::Remap(plan) => {
532 JsonValue::Array(vec![JsonValue::String("remap".into()), plan.clone()])
533 }
534
535 WireExpression::CapRef(id) => JsonValue::Array(vec![
536 JsonValue::String("capref".into()),
537 JsonValue::Number(Number::from(*id)),
538 ]),
539
540 WireExpression::Call {
541 cap_id,
542 property_path,
543 args,
544 } => {
545 let mut arr = vec![
546 JsonValue::String("call".into()),
547 JsonValue::Number(Number::from(*cap_id)),
548 ];
549
550 let path_json: Vec<JsonValue> = property_path
551 .iter()
552 .map(|key| match key {
553 PropertyKey::String(s) => JsonValue::String(s.clone()),
554 PropertyKey::Number(n) => JsonValue::Number(Number::from(*n)),
555 })
556 .collect();
557 arr.push(JsonValue::Array(path_json));
558 arr.push(args.to_json());
559
560 JsonValue::Array(arr)
561 }
562 }
563 }
564}
565
566pub fn parse_wire_batch(input: &str) -> Result<Vec<WireMessage>, String> {
568 debug!("Parsing wire batch, input length: {} chars", input.len());
569 let mut messages = Vec::new();
570 let mut line_num = 0;
571
572 for line in input.lines() {
573 line_num += 1;
574 let line = line.trim();
575 if line.is_empty() {
576 trace!("Skipping empty line {}", line_num);
577 continue;
578 }
579
580 trace!("Parsing line {}: {}", line_num, line);
581
582 let json: JsonValue = serde_json::from_str(line).map_err(|e| {
583 warn!("Failed to parse JSON on line {}: {}", line_num, e);
584 format!("Invalid JSON on line {}: {}", line_num, e)
585 })?;
586
587 let arr = json.as_array().ok_or_else(|| {
588 warn!("Line {} is not an array: {:?}", line_num, json);
589 format!("Message on line {} must be an array", line_num)
590 })?;
591
592 let msg = WireMessage::from_json_array(arr)?;
593 debug!(
594 "Successfully parsed message on line {}: {:?}",
595 line_num, msg
596 );
597 messages.push(msg);
598 }
599
600 debug!(
601 "Successfully parsed {} messages from wire batch",
602 messages.len()
603 );
604 Ok(messages)
605}
606
607pub fn serialize_wire_batch(messages: &[WireMessage]) -> String {
609 debug!("Serializing {} messages to wire format", messages.len());
610
611 let result = messages
612 .iter()
613 .enumerate()
614 .map(|(i, msg)| {
615 trace!("Serializing message {}: {:?}", i, msg);
616 let arr = msg.to_json_array();
617 serde_json::to_string(&arr).unwrap()
618 })
619 .collect::<Vec<_>>()
620 .join("\n");
621
622 debug!("Serialized wire batch: {} bytes", result.len());
623 result
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[test]
631 fn test_parse_push_message() {
632 let input = r#"["push",["pipeline",0,["add"],[5,3]]]"#;
634 let json: JsonValue = serde_json::from_str(input).unwrap();
635 let arr = json.as_array().unwrap();
636 let msg = WireMessage::from_json_array(arr).unwrap();
637
638 match msg {
639 WireMessage::Push(WireExpression::Pipeline {
640 import_id,
641 property_path,
642 args,
643 }) => {
644 assert_eq!(import_id, 0);
645 assert_eq!(property_path, Some(vec![PropertyKey::String("add".into())]));
646 assert!(args.is_some());
647 }
648 _ => panic!("Expected Push with Pipeline"),
649 }
650 }
651
652 #[test]
653 fn test_parse_pull_message() {
654 let input = r#"["pull",1]"#;
655 let json: JsonValue = serde_json::from_str(input).unwrap();
656 let arr = json.as_array().unwrap();
657 let msg = WireMessage::from_json_array(arr).unwrap();
658
659 assert_eq!(msg, WireMessage::Pull(1));
660 }
661
662 #[test]
663 fn test_parse_batch() {
664 let input = r#"["push",["pipeline",0,["add"],[5,3]]]
666["pull",1]"#;
667 let messages = parse_wire_batch(input).unwrap();
668 assert_eq!(messages.len(), 2);
669
670 match &messages[0] {
672 WireMessage::Push(_) => {}
673 _ => panic!("Expected first message to be Push"),
674 }
675
676 match &messages[1] {
678 WireMessage::Pull(id) => assert_eq!(*id, 1),
679 _ => panic!("Expected second message to be Pull"),
680 }
681 }
682
683 #[test]
684 fn test_serialize_response() {
685 let messages = vec![WireMessage::Resolve(
687 1,
688 WireExpression::Number(serde_json::Number::from(8)),
689 )];
690
691 let output = serialize_wire_batch(&messages);
692 assert_eq!(output, r#"["resolve",1,8]"#);
693 }
694
695 #[test]
696 fn test_full_protocol_flow() {
697 let client_batch = r#"["push",["pipeline",0,["add"],[5,3]]]
701["pull",1]"#;
702 let client_messages = parse_wire_batch(client_batch).unwrap();
703 assert_eq!(client_messages.len(), 2);
704
705 let server_response = WireMessage::Resolve(
707 1, WireExpression::Number(serde_json::Number::from(8)),
709 );
710
711 let response_str = serialize_wire_batch(&[server_response]);
712 assert_eq!(response_str, r#"["resolve",1,8]"#);
713 }
714
715 #[test]
716 fn test_capref_wire_expression() {
717 let input = r#"["capref",42]"#;
719 let json: JsonValue = serde_json::from_str(input).unwrap();
720 let expr = WireExpression::from_json(&json).unwrap();
721
722 match expr {
723 WireExpression::CapRef(id) => assert_eq!(id, 42),
724 _ => panic!("Expected CapRef expression"),
725 }
726
727 let serialized = expr.to_json();
729 let expected = serde_json::json!(["capref", 42]);
730 assert_eq!(serialized, expected);
731 }
732
733 #[test]
734 fn test_capability_passing_in_args() {
735 let input = r#"["push",["pipeline",0,["method"],[["capref",5],"regular_arg"]]]"#;
737 let json: JsonValue = serde_json::from_str(input).unwrap();
738 let arr = json.as_array().unwrap();
739 let msg = WireMessage::from_json_array(arr).unwrap();
740
741 match msg {
742 WireMessage::Push(WireExpression::Pipeline {
743 import_id,
744 property_path,
745 args,
746 }) => {
747 assert_eq!(import_id, 0);
748 assert_eq!(
749 property_path,
750 Some(vec![PropertyKey::String("method".into())])
751 );
752
753 if let Some(args_expr) = args {
754 match args_expr.as_ref() {
755 WireExpression::Array(items) => {
756 assert_eq!(items.len(), 2);
757 match &items[0] {
758 WireExpression::CapRef(id) => assert_eq!(*id, 5),
759 _ => panic!("Expected first arg to be CapRef"),
760 }
761 match &items[1] {
762 WireExpression::String(s) => assert_eq!(s, "regular_arg"),
763 _ => panic!("Expected second arg to be string"),
764 }
765 }
766 _ => panic!("Expected args to be array"),
767 }
768 }
769 }
770 _ => panic!("Expected Push with Pipeline"),
771 }
772 }
773}