xs/nu/commands/
append_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
4
5use serde_json::Value as JsonValue;
6
7use crate::nu::util;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12    store: Store,
13    base_meta: JsonValue,
14}
15
16impl AppendCommand {
17    pub fn new(store: Store, base_meta: JsonValue) -> Self {
18        Self { store, base_meta }
19    }
20}
21
22impl Command for AppendCommand {
23    fn name(&self) -> &str {
24        ".append"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".append")
29            .input_output_types(vec![(Type::Any, Type::Any)])
30            .required("topic", SyntaxShape::String, "this clip's topic")
31            .named(
32                "meta",
33                SyntaxShape::Record(vec![]),
34                "arbitrary metadata",
35                None,
36            )
37            .named(
38                "ttl",
39                SyntaxShape::String,
40                r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
41                None,
42            )
43            .category(Category::Experimental)
44    }
45
46    fn description(&self) -> &str {
47        "Writes its input to the CAS and then appends a frame with a hash of this content to the given topic on the stream."
48    }
49
50    fn run(
51        &self,
52        engine_state: &EngineState,
53        stack: &mut Stack,
54        call: &Call,
55        input: PipelineData,
56    ) -> Result<PipelineData, ShellError> {
57        let span = call.head;
58
59        let store = self.store.clone();
60
61        let topic: String = call.req(engine_state, stack, 0)?;
62
63        // Get user-supplied metadata and convert to JSON
64        let user_meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
65        let mut final_meta = self.base_meta.clone(); // Start with base metadata
66
67        // Merge user metadata if provided
68        if let Some(user_value) = user_meta {
69            let user_json = util::value_to_json(&user_value);
70            if let JsonValue::Object(mut base_obj) = final_meta {
71                if let JsonValue::Object(user_obj) = user_json {
72                    base_obj.extend(user_obj); // Merge user metadata into base
73                    final_meta = JsonValue::Object(base_obj);
74                } else {
75                    return Err(ShellError::TypeMismatch {
76                        err_message: "Meta must be a record".to_string(),
77                        span: call.span(),
78                    });
79                }
80            }
81        }
82
83        let ttl: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
84        let ttl = match ttl {
85            Some(ttl_str) => Some(TTL::from_query(Some(&format!("ttl={ttl_str}"))).map_err(
86                |e| ShellError::TypeMismatch {
87                    err_message: format!("Invalid TTL value: {ttl_str}. {e}"),
88                    span: call.span(),
89                },
90            )?),
91            None => None,
92        };
93
94        let hash = util::write_pipeline_to_cas(input, &store, span).map_err(|boxed| *boxed)?;
95
96        let frame = store.append(
97            Frame::builder(topic)
98                .maybe_hash(hash)
99                .meta(final_meta)
100                .maybe_ttl(ttl)
101                .build(),
102        )?;
103
104        Ok(PipelineData::Value(
105            util::frame_to_value(&frame, span),
106            None,
107        ))
108    }
109}