Skip to main content

xs/nu/commands/
append_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
4
5use serde_json::Value as JsonValue;
6
7use crate::nu::util;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12    store: Store,
13    base_meta: JsonValue,
14}
15
16impl AppendCommand {
17    pub fn new(store: Store, base_meta: JsonValue) -> Self {
18        Self { store, base_meta }
19    }
20}
21
22impl Command for AppendCommand {
23    fn name(&self) -> &str {
24        ".append"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".append")
29            .input_output_types(vec![(Type::Any, Type::Any)])
30            .required("topic", SyntaxShape::String, "this clip's topic")
31            .named(
32                "meta",
33                SyntaxShape::Record(vec![]),
34                "arbitrary metadata",
35                None,
36            )
37            .named(
38                "ttl",
39                SyntaxShape::String,
40                r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
41                None,
42            )
43            .switch(
44                "with-timestamp",
45                "include timestamp extracted from frame ID",
46                None,
47            )
48            .category(Category::Experimental)
49    }
50
51    fn description(&self) -> &str {
52        "Writes its input to the CAS and then appends a frame with a hash of this content to the given topic on the stream."
53    }
54
55    fn run(
56        &self,
57        engine_state: &EngineState,
58        stack: &mut Stack,
59        call: &Call,
60        input: PipelineData,
61    ) -> Result<PipelineData, ShellError> {
62        let span = call.head;
63        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
64
65        let store = self.store.clone();
66
67        let topic: String = call.req(engine_state, stack, 0)?;
68
69        // Get user-supplied metadata and convert to JSON
70        let user_meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
71        let mut final_meta = self.base_meta.clone(); // Start with base metadata
72
73        // Merge user metadata if provided
74        if let Some(user_value) = user_meta {
75            let user_json = util::value_to_json(&user_value);
76            if let JsonValue::Object(mut base_obj) = final_meta {
77                if let JsonValue::Object(user_obj) = user_json {
78                    base_obj.extend(user_obj); // Merge user metadata into base
79                    final_meta = JsonValue::Object(base_obj);
80                } else {
81                    return Err(ShellError::TypeMismatch {
82                        err_message: "Meta must be a record".to_string(),
83                        span: call.span(),
84                    });
85                }
86            }
87        }
88
89        let ttl: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
90        let ttl = match ttl {
91            Some(ttl_str) => Some(TTL::from_query(Some(&format!("ttl={ttl_str}"))).map_err(
92                |e| ShellError::TypeMismatch {
93                    err_message: format!("Invalid TTL value: {ttl_str}. {e}"),
94                    span: call.span(),
95                },
96            )?),
97            None => None,
98        };
99
100        let hash = util::write_pipeline_to_cas(input, &store, span).map_err(|boxed| *boxed)?;
101
102        let frame = store.append(
103            Frame::builder(topic)
104                .maybe_hash(hash)
105                .meta(final_meta)
106                .maybe_ttl(ttl)
107                .build(),
108        )?;
109
110        Ok(PipelineData::Value(
111            util::frame_to_value(&frame, span, with_timestamp),
112            None,
113        ))
114    }
115}