capnweb_core/protocol/
wire.rs

1// Cap'n Web Official Wire Protocol Implementation
2// Specification: https://github.com/cloudflare/capnweb/blob/main/protocol.md
3//
4// The protocol uses newline-delimited JSON arrays as messages.
5// Each message is an array where the first element is the message type.
6
7use serde_json::{Number, Value as JsonValue};
8use std::collections::HashMap;
9use tracing::{debug, trace, warn};
10
11/// Wire protocol message types
12#[derive(Debug, Clone, PartialEq)]
13pub enum WireMessage {
14    /// ["push", expression] - Push an expression for evaluation
15    Push(WireExpression),
16
17    /// ["pull", import_id] - Pull a promise to get its resolved value
18    Pull(i64),
19
20    /// ["resolve", export_id, value] - Resolve a promise with a value
21    Resolve(i64, WireExpression),
22
23    /// ["reject", export_id, error] - Reject a promise with an error
24    Reject(i64, WireExpression),
25
26    /// ["release", [import_ids...]] - Release/dispose capabilities
27    Release(Vec<i64>),
28
29    /// ["abort", error] - Abort the session with an error
30    Abort(WireExpression),
31}
32
33/// Wire protocol expressions
34#[derive(Debug, Clone, PartialEq)]
35pub enum WireExpression {
36    // Literal values
37    Null,
38    Bool(bool),
39    Number(Number),
40    String(String),
41    Array(Vec<WireExpression>),
42    Object(HashMap<String, WireExpression>),
43
44    // Special forms (represented as arrays in the protocol)
45    /// ["error", type, message, stack?]
46    Error {
47        error_type: String,
48        message: String,
49        stack: Option<String>,
50    },
51
52    /// ["import", id]
53    Import(i64),
54
55    /// ["export", id, promise?]
56    Export {
57        id: i64,
58        is_promise: bool,
59    },
60
61    /// ["promise", id]
62    Promise(i64),
63
64    /// ["pipeline", import_id, property_path?, args?]
65    Pipeline {
66        import_id: i64,
67        property_path: Option<Vec<PropertyKey>>,
68        args: Option<Box<WireExpression>>,
69    },
70
71    /// ["call", cap_id, property_path, args]
72    Call {
73        cap_id: i64,
74        property_path: Vec<PropertyKey>,
75        args: Box<WireExpression>,
76    },
77
78    /// ["date", timestamp]
79    Date(f64),
80
81    /// ["remap", plan]
82    Remap(JsonValue), // IL plan, keep as raw JSON for now
83
84    /// ["capref", id] - Reference to a capability for marshaling
85    CapRef(i64),
86}
87
88#[derive(Debug, Clone, PartialEq)]
89pub enum PropertyKey {
90    String(String),
91    Number(usize),
92}
93
94impl WireMessage {
95    /// Parse a wire message from a JSON array
96    pub fn from_json_array(arr: &[JsonValue]) -> Result<Self, String> {
97        trace!(
98            "Parsing wire message from JSON array with {} elements",
99            arr.len()
100        );
101
102        if arr.is_empty() {
103            warn!("Attempted to parse empty message array");
104            return Err("Empty message array".into());
105        }
106
107        let msg_type = arr[0].as_str().ok_or_else(|| {
108            warn!("Message type is not a string: {:?}", arr[0]);
109            "Message type must be a string".to_string()
110        })?;
111
112        debug!("Parsing message type: {}", msg_type);
113
114        match msg_type {
115            "push" => {
116                if arr.len() != 2 {
117                    warn!("push message has {} elements, expected 2", arr.len());
118                    return Err("push requires exactly 2 elements".into());
119                }
120                trace!("Parsing push expression: {:?}", arr[1]);
121                let expr = WireExpression::from_json(&arr[1])?;
122                Ok(WireMessage::Push(expr))
123            }
124
125            "pull" => {
126                if arr.len() != 2 {
127                    warn!("pull message has {} elements, expected 2", arr.len());
128                    return Err("pull requires exactly 2 elements".into());
129                }
130                trace!("Parsing pull with import ID: {:?}", arr[1]);
131                let id = arr[1]
132                    .as_i64()
133                    .ok_or_else(|| "pull requires an integer import ID".to_string())?;
134                Ok(WireMessage::Pull(id))
135            }
136
137            "resolve" => {
138                if arr.len() != 3 {
139                    return Err("resolve requires exactly 3 elements".into());
140                }
141                let id = arr[1]
142                    .as_i64()
143                    .ok_or_else(|| "resolve requires an integer export ID".to_string())?;
144                let value = WireExpression::from_json(&arr[2])?;
145                Ok(WireMessage::Resolve(id, value))
146            }
147
148            "reject" => {
149                if arr.len() != 3 {
150                    return Err("reject requires exactly 3 elements".into());
151                }
152                let id = arr[1]
153                    .as_i64()
154                    .ok_or_else(|| "reject requires an integer export ID".to_string())?;
155                let error = WireExpression::from_json(&arr[2])?;
156                Ok(WireMessage::Reject(id, error))
157            }
158
159            "release" => {
160                if arr.len() != 2 {
161                    return Err("release requires exactly 2 elements".into());
162                }
163                let ids = arr[1]
164                    .as_array()
165                    .ok_or_else(|| "release requires an array of import IDs".to_string())?
166                    .iter()
167                    .map(|v| {
168                        v.as_i64()
169                            .ok_or_else(|| "release IDs must be integers".to_string())
170                    })
171                    .collect::<Result<Vec<_>, _>>()?;
172                Ok(WireMessage::Release(ids))
173            }
174
175            "abort" => {
176                if arr.len() != 2 {
177                    return Err("abort requires exactly 2 elements".into());
178                }
179                let error = WireExpression::from_json(&arr[1])?;
180                Ok(WireMessage::Abort(error))
181            }
182
183            _ => {
184                warn!("Unknown message type: {}", msg_type);
185                Err(format!("Unknown message type: {}", msg_type))
186            }
187        }
188    }
189
190    /// Convert to JSON array for wire format
191    pub fn to_json_array(&self) -> Vec<JsonValue> {
192        match self {
193            WireMessage::Push(expr) => {
194                vec![JsonValue::String("push".into()), expr.to_json()]
195            }
196            WireMessage::Pull(id) => {
197                vec![
198                    JsonValue::String("pull".into()),
199                    JsonValue::Number(Number::from(*id)),
200                ]
201            }
202            WireMessage::Resolve(id, value) => {
203                vec![
204                    JsonValue::String("resolve".into()),
205                    JsonValue::Number(Number::from(*id)),
206                    value.to_json(),
207                ]
208            }
209            WireMessage::Reject(id, error) => {
210                vec![
211                    JsonValue::String("reject".into()),
212                    JsonValue::Number(Number::from(*id)),
213                    error.to_json(),
214                ]
215            }
216            WireMessage::Release(ids) => {
217                vec![
218                    JsonValue::String("release".into()),
219                    JsonValue::Array(
220                        ids.iter()
221                            .map(|id| JsonValue::Number(Number::from(*id)))
222                            .collect(),
223                    ),
224                ]
225            }
226            WireMessage::Abort(error) => {
227                vec![JsonValue::String("abort".into()), error.to_json()]
228            }
229        }
230    }
231}
232
233impl WireExpression {
234    /// Parse an expression from JSON value
235    pub fn from_json(value: &JsonValue) -> Result<Self, String> {
236        trace!("Parsing expression from JSON: {:?}", value);
237
238        match value {
239            JsonValue::Null => Ok(WireExpression::Null),
240            JsonValue::Bool(b) => Ok(WireExpression::Bool(*b)),
241            JsonValue::Number(n) => Ok(WireExpression::Number(n.clone())),
242            JsonValue::String(s) => Ok(WireExpression::String(s.clone())),
243
244            JsonValue::Array(arr) if !arr.is_empty() => {
245                // Check if it's a special form
246                if let Some(JsonValue::String(type_str)) = arr.first() {
247                    debug!("Parsing special form: {}", type_str);
248                    match type_str.as_str() {
249                        "error" => {
250                            if arr.len() < 3 || arr.len() > 4 {
251                                return Err("error requires 3-4 elements".into());
252                            }
253                            let error_type = arr[1]
254                                .as_str()
255                                .ok_or("error type must be string")?
256                                .to_string();
257                            let message = arr[2]
258                                .as_str()
259                                .ok_or("error message must be string")?
260                                .to_string();
261                            let stack = arr.get(3).and_then(|v| v.as_str()).map(|s| s.to_string());
262                            Ok(WireExpression::Error {
263                                error_type,
264                                message,
265                                stack,
266                            })
267                        }
268
269                        "import" => {
270                            if arr.len() != 2 {
271                                return Err("import requires exactly 2 elements".into());
272                            }
273                            let id = arr[1].as_i64().ok_or("import ID must be integer")?;
274                            Ok(WireExpression::Import(id))
275                        }
276
277                        "export" => {
278                            if arr.len() < 2 || arr.len() > 3 {
279                                return Err("export requires 2-3 elements".into());
280                            }
281                            let id = arr[1].as_i64().ok_or("export ID must be integer")?;
282                            let is_promise = arr.get(2).and_then(|v| v.as_bool()).unwrap_or(false);
283                            Ok(WireExpression::Export { id, is_promise })
284                        }
285
286                        "promise" => {
287                            if arr.len() != 2 {
288                                return Err("promise requires exactly 2 elements".into());
289                            }
290                            let id = arr[1].as_i64().ok_or("promise ID must be integer")?;
291                            Ok(WireExpression::Promise(id))
292                        }
293
294                        "pipeline" => {
295                            if arr.len() < 2 || arr.len() > 4 {
296                                warn!("pipeline has {} elements, expected 2-4", arr.len());
297                                return Err("pipeline requires 2-4 elements".into());
298                            }
299                            let import_id = arr[1]
300                                .as_i64()
301                                .ok_or("pipeline import ID must be integer")?;
302
303                            trace!("Pipeline: import_id={}, elements={}", import_id, arr.len());
304
305                            let property_path = arr
306                                .get(2)
307                                .and_then(|v| v.as_array())
308                                .map(|path| {
309                                    path.iter()
310                                        .map(|key| {
311                                            if let Some(s) = key.as_str() {
312                                                Ok(PropertyKey::String(s.to_string()))
313                                            } else if let Some(n) = key.as_u64() {
314                                                Ok(PropertyKey::Number(n as usize))
315                                            } else {
316                                                Err("Property key must be string or number"
317                                                    .to_string())
318                                            }
319                                        })
320                                        .collect::<Result<Vec<_>, _>>()
321                                })
322                                .transpose()?;
323
324                            let args = arr
325                                .get(3)
326                                .map(WireExpression::from_json)
327                                .transpose()?
328                                .map(Box::new);
329
330                            Ok(WireExpression::Pipeline {
331                                import_id,
332                                property_path,
333                                args,
334                            })
335                        }
336
337                        "call" => {
338                            if arr.len() != 4 {
339                                warn!("call has {} elements, expected 4", arr.len());
340                                return Err("call requires exactly 4 elements".into());
341                            }
342                            let cap_id = arr[1].as_i64().ok_or("call cap ID must be integer")?;
343
344                            trace!("Call: cap_id={}, elements={}", cap_id, arr.len());
345
346                            let property_path = arr[2]
347                                .as_array()
348                                .ok_or("call property path must be array")?
349                                .iter()
350                                .map(|key| {
351                                    if let Some(s) = key.as_str() {
352                                        Ok(PropertyKey::String(s.to_string()))
353                                    } else if let Some(n) = key.as_u64() {
354                                        Ok(PropertyKey::Number(n as usize))
355                                    } else {
356                                        Err("Property key must be string or number".to_string())
357                                    }
358                                })
359                                .collect::<Result<Vec<_>, _>>()?;
360
361                            let args = Box::new(WireExpression::from_json(&arr[3])?);
362
363                            Ok(WireExpression::Call {
364                                cap_id,
365                                property_path,
366                                args,
367                            })
368                        }
369
370                        "date" => {
371                            if arr.len() != 2 {
372                                return Err("date requires exactly 2 elements".into());
373                            }
374                            let timestamp =
375                                arr[1].as_f64().ok_or("date timestamp must be number")?;
376                            Ok(WireExpression::Date(timestamp))
377                        }
378
379                        "remap" => {
380                            if arr.len() != 2 {
381                                return Err("remap requires exactly 2 elements".into());
382                            }
383                            Ok(WireExpression::Remap(arr[1].clone()))
384                        }
385
386                        "capref" => {
387                            if arr.len() != 2 {
388                                return Err("capref requires exactly 2 elements".into());
389                            }
390                            let id = arr[1].as_i64().ok_or("capref ID must be integer")?;
391                            Ok(WireExpression::CapRef(id))
392                        }
393
394                        _ => {
395                            // Not a special form, just a regular array
396                            let items = arr
397                                .iter()
398                                .map(WireExpression::from_json)
399                                .collect::<Result<Vec<_>, _>>()?;
400                            Ok(WireExpression::Array(items))
401                        }
402                    }
403                } else {
404                    // Check if it's an escaped array (Cap'n Web protocol wraps arrays in arrays)
405                    // If the array has exactly one element that is itself an array, unwrap it
406                    if arr.len() == 1 {
407                        if let Some(JsonValue::Array(inner)) = arr.first() {
408                            // This is an escaped array, unwrap it
409                            let items = inner
410                                .iter()
411                                .map(WireExpression::from_json)
412                                .collect::<Result<Vec<_>, _>>()?;
413                            return Ok(WireExpression::Array(items));
414                        }
415                    }
416
417                    // Regular array (not escaped)
418                    let items = arr
419                        .iter()
420                        .map(WireExpression::from_json)
421                        .collect::<Result<Vec<_>, _>>()?;
422                    Ok(WireExpression::Array(items))
423                }
424            }
425
426            JsonValue::Array(_arr) => Ok(WireExpression::Array(vec![])), // Empty array
427
428            JsonValue::Object(obj) => {
429                let map = obj
430                    .iter()
431                    .map(|(k, v)| Ok((k.clone(), WireExpression::from_json(v)?)))
432                    .collect::<Result<HashMap<_, _>, String>>()?;
433                Ok(WireExpression::Object(map))
434            }
435        }
436    }
437
438    /// Convert to JSON for wire format
439    pub fn to_json(&self) -> JsonValue {
440        match self {
441            WireExpression::Null => JsonValue::Null,
442            WireExpression::Bool(b) => JsonValue::Bool(*b),
443            WireExpression::Number(n) => JsonValue::Number(n.clone()),
444            WireExpression::String(s) => JsonValue::String(s.clone()),
445
446            WireExpression::Array(items) => {
447                // Cap'n Web protocol requires arrays to be "escaped" by wrapping in another array
448                // This matches the TypeScript client's expectation for array serialization
449                let inner_array = items.iter().map(|e| e.to_json()).collect();
450                JsonValue::Array(vec![JsonValue::Array(inner_array)])
451            }
452
453            WireExpression::Object(map) => {
454                JsonValue::Object(map.iter().map(|(k, v)| (k.clone(), v.to_json())).collect())
455            }
456
457            WireExpression::Error {
458                error_type,
459                message,
460                stack,
461            } => {
462                let mut arr = vec![
463                    JsonValue::String("error".into()),
464                    JsonValue::String(error_type.clone()),
465                    JsonValue::String(message.clone()),
466                ];
467                if let Some(s) = stack {
468                    arr.push(JsonValue::String(s.clone()));
469                }
470                JsonValue::Array(arr)
471            }
472
473            WireExpression::Import(id) => JsonValue::Array(vec![
474                JsonValue::String("import".into()),
475                JsonValue::Number(Number::from(*id)),
476            ]),
477
478            WireExpression::Export { id, is_promise } => {
479                let mut arr = vec![
480                    JsonValue::String("export".into()),
481                    JsonValue::Number(Number::from(*id)),
482                ];
483                if *is_promise {
484                    arr.push(JsonValue::Bool(true));
485                }
486                JsonValue::Array(arr)
487            }
488
489            WireExpression::Promise(id) => JsonValue::Array(vec![
490                JsonValue::String("promise".into()),
491                JsonValue::Number(Number::from(*id)),
492            ]),
493
494            WireExpression::Pipeline {
495                import_id,
496                property_path,
497                args,
498            } => {
499                let mut arr = vec![
500                    JsonValue::String("pipeline".into()),
501                    JsonValue::Number(Number::from(*import_id)),
502                ];
503
504                if let Some(path) = property_path {
505                    let path_json: Vec<JsonValue> = path
506                        .iter()
507                        .map(|key| match key {
508                            PropertyKey::String(s) => JsonValue::String(s.clone()),
509                            PropertyKey::Number(n) => JsonValue::Number(Number::from(*n)),
510                        })
511                        .collect();
512                    arr.push(JsonValue::Array(path_json));
513
514                    if let Some(a) = args {
515                        arr.push(a.to_json());
516                    }
517                } else if let Some(a) = args {
518                    // If no property path but has args, need empty array for path
519                    arr.push(JsonValue::Array(vec![]));
520                    arr.push(a.to_json());
521                }
522
523                JsonValue::Array(arr)
524            }
525
526            WireExpression::Date(timestamp) => JsonValue::Array(vec![
527                JsonValue::String("date".into()),
528                JsonValue::Number(Number::from_f64(*timestamp).unwrap_or_else(|| Number::from(0))), // Use 0 for invalid timestamps
529            ]),
530
531            WireExpression::Remap(plan) => {
532                JsonValue::Array(vec![JsonValue::String("remap".into()), plan.clone()])
533            }
534
535            WireExpression::CapRef(id) => JsonValue::Array(vec![
536                JsonValue::String("capref".into()),
537                JsonValue::Number(Number::from(*id)),
538            ]),
539
540            WireExpression::Call {
541                cap_id,
542                property_path,
543                args,
544            } => {
545                let mut arr = vec![
546                    JsonValue::String("call".into()),
547                    JsonValue::Number(Number::from(*cap_id)),
548                ];
549
550                let path_json: Vec<JsonValue> = property_path
551                    .iter()
552                    .map(|key| match key {
553                        PropertyKey::String(s) => JsonValue::String(s.clone()),
554                        PropertyKey::Number(n) => JsonValue::Number(Number::from(*n)),
555                    })
556                    .collect();
557                arr.push(JsonValue::Array(path_json));
558                arr.push(args.to_json());
559
560                JsonValue::Array(arr)
561            }
562        }
563    }
564}
565
566/// Parse a newline-delimited batch of messages
567pub fn parse_wire_batch(input: &str) -> Result<Vec<WireMessage>, String> {
568    debug!("Parsing wire batch, input length: {} chars", input.len());
569    let mut messages = Vec::new();
570    let mut line_num = 0;
571
572    for line in input.lines() {
573        line_num += 1;
574        let line = line.trim();
575        if line.is_empty() {
576            trace!("Skipping empty line {}", line_num);
577            continue;
578        }
579
580        trace!("Parsing line {}: {}", line_num, line);
581
582        let json: JsonValue = serde_json::from_str(line).map_err(|e| {
583            warn!("Failed to parse JSON on line {}: {}", line_num, e);
584            format!("Invalid JSON on line {}: {}", line_num, e)
585        })?;
586
587        let arr = json.as_array().ok_or_else(|| {
588            warn!("Line {} is not an array: {:?}", line_num, json);
589            format!("Message on line {} must be an array", line_num)
590        })?;
591
592        let msg = WireMessage::from_json_array(arr)?;
593        debug!(
594            "Successfully parsed message on line {}: {:?}",
595            line_num, msg
596        );
597        messages.push(msg);
598    }
599
600    debug!(
601        "Successfully parsed {} messages from wire batch",
602        messages.len()
603    );
604    Ok(messages)
605}
606
607/// Serialize messages to newline-delimited format
608pub fn serialize_wire_batch(messages: &[WireMessage]) -> String {
609    debug!("Serializing {} messages to wire format", messages.len());
610
611    let result = messages
612        .iter()
613        .enumerate()
614        .map(|(i, msg)| {
615            trace!("Serializing message {}: {:?}", i, msg);
616            let arr = msg.to_json_array();
617            serde_json::to_string(&arr).unwrap()
618        })
619        .collect::<Vec<_>>()
620        .join("\n");
621
622    debug!("Serialized wire batch: {} bytes", result.len());
623    result
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    #[test]
631    fn test_parse_push_message() {
632        // Test the exact format the TypeScript client sends
633        let input = r#"["push",["pipeline",0,["add"],[5,3]]]"#;
634        let json: JsonValue = serde_json::from_str(input).unwrap();
635        let arr = json.as_array().unwrap();
636        let msg = WireMessage::from_json_array(arr).unwrap();
637
638        match msg {
639            WireMessage::Push(WireExpression::Pipeline {
640                import_id,
641                property_path,
642                args,
643            }) => {
644                assert_eq!(import_id, 0);
645                assert_eq!(property_path, Some(vec![PropertyKey::String("add".into())]));
646                assert!(args.is_some());
647            }
648            _ => panic!("Expected Push with Pipeline"),
649        }
650    }
651
652    #[test]
653    fn test_parse_pull_message() {
654        let input = r#"["pull",1]"#;
655        let json: JsonValue = serde_json::from_str(input).unwrap();
656        let arr = json.as_array().unwrap();
657        let msg = WireMessage::from_json_array(arr).unwrap();
658
659        assert_eq!(msg, WireMessage::Pull(1));
660    }
661
662    #[test]
663    fn test_parse_batch() {
664        // Test exact TypeScript client batch format
665        let input = r#"["push",["pipeline",0,["add"],[5,3]]]
666["pull",1]"#;
667        let messages = parse_wire_batch(input).unwrap();
668        assert_eq!(messages.len(), 2);
669
670        // Verify first message is push
671        match &messages[0] {
672            WireMessage::Push(_) => {}
673            _ => panic!("Expected first message to be Push"),
674        }
675
676        // Verify second message is pull with ID 1
677        match &messages[1] {
678            WireMessage::Pull(id) => assert_eq!(*id, 1),
679            _ => panic!("Expected second message to be Pull"),
680        }
681    }
682
683    #[test]
684    fn test_serialize_response() {
685        // Test serializing responses like the server should
686        let messages = vec![WireMessage::Resolve(
687            1,
688            WireExpression::Number(serde_json::Number::from(8)),
689        )];
690
691        let output = serialize_wire_batch(&messages);
692        assert_eq!(output, r#"["resolve",1,8]"#);
693    }
694
695    #[test]
696    fn test_full_protocol_flow() {
697        // Test the full push/pull/resolve flow
698
699        // Client sends push and pull
700        let client_batch = r#"["push",["pipeline",0,["add"],[5,3]]]
701["pull",1]"#;
702        let client_messages = parse_wire_batch(client_batch).unwrap();
703        assert_eq!(client_messages.len(), 2);
704
705        // Server should respond with resolve using the same ID from pull
706        let server_response = WireMessage::Resolve(
707            1, // Use the import ID from the pull
708            WireExpression::Number(serde_json::Number::from(8)),
709        );
710
711        let response_str = serialize_wire_batch(&[server_response]);
712        assert_eq!(response_str, r#"["resolve",1,8]"#);
713    }
714
715    #[test]
716    fn test_capref_wire_expression() {
717        // Test CapRef parsing and serialization
718        let input = r#"["capref",42]"#;
719        let json: JsonValue = serde_json::from_str(input).unwrap();
720        let expr = WireExpression::from_json(&json).unwrap();
721
722        match expr {
723            WireExpression::CapRef(id) => assert_eq!(id, 42),
724            _ => panic!("Expected CapRef expression"),
725        }
726
727        // Test serialization
728        let serialized = expr.to_json();
729        let expected = serde_json::json!(["capref", 42]);
730        assert_eq!(serialized, expected);
731    }
732
733    #[test]
734    fn test_capability_passing_in_args() {
735        // Test capability references passed as arguments
736        let input = r#"["push",["pipeline",0,["method"],[["capref",5],"regular_arg"]]]"#;
737        let json: JsonValue = serde_json::from_str(input).unwrap();
738        let arr = json.as_array().unwrap();
739        let msg = WireMessage::from_json_array(arr).unwrap();
740
741        match msg {
742            WireMessage::Push(WireExpression::Pipeline {
743                import_id,
744                property_path,
745                args,
746            }) => {
747                assert_eq!(import_id, 0);
748                assert_eq!(
749                    property_path,
750                    Some(vec![PropertyKey::String("method".into())])
751                );
752
753                if let Some(args_expr) = args {
754                    match args_expr.as_ref() {
755                        WireExpression::Array(items) => {
756                            assert_eq!(items.len(), 2);
757                            match &items[0] {
758                                WireExpression::CapRef(id) => assert_eq!(*id, 5),
759                                _ => panic!("Expected first arg to be CapRef"),
760                            }
761                            match &items[1] {
762                                WireExpression::String(s) => assert_eq!(s, "regular_arg"),
763                                _ => panic!("Expected second arg to be string"),
764                            }
765                        }
766                        _ => panic!("Expected args to be array"),
767                    }
768                }
769            }
770            _ => panic!("Expected Push with Pipeline"),
771        }
772    }
773}