Skip to main content

xs/nu/commands/
append_command.rs

1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
4
5use serde_json::Value as JsonValue;
6
7use crate::nu::util;
8use crate::store::{Frame, Store, TTL};
9
10/// Env var carrying the base metadata stamped on every frame a runner appends
11/// (`service_id`, `{action_id, frame_id}`, ...), as a JSON object string.
12/// Injecting it per instance keeps `.append` instance-independent, so one decl
13/// can be registered on a prepared engine and reused across spawns and restarts.
14pub const APPEND_META_ENV: &str = "XS_APPEND_META";
15
16#[derive(Clone)]
17pub struct AppendCommand {
18    store: Store,
19}
20
21impl AppendCommand {
22    pub fn new(store: Store) -> Self {
23        Self { store }
24    }
25}
26
27impl Command for AppendCommand {
28    fn name(&self) -> &str {
29        ".append"
30    }
31
32    fn signature(&self) -> Signature {
33        Signature::build(".append")
34            .input_output_types(vec![(Type::Any, Type::Any)])
35            .required("topic", SyntaxShape::String, "this clip's topic")
36            .named(
37                "meta",
38                SyntaxShape::Record(vec![]),
39                "arbitrary metadata",
40                None,
41            )
42            .named(
43                "ttl",
44                SyntaxShape::String,
45                r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
46                None,
47            )
48            .switch(
49                "with-timestamp",
50                "include timestamp extracted from frame ID",
51                None,
52            )
53            .category(Category::Experimental)
54    }
55
56    fn description(&self) -> &str {
57        "Writes its input to the CAS and then appends a frame with a hash of this content to the given topic on the stream."
58    }
59
60    fn run(
61        &self,
62        engine_state: &EngineState,
63        stack: &mut Stack,
64        call: &Call,
65        input: PipelineData,
66    ) -> Result<PipelineData, ShellError> {
67        let span = call.head;
68        let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
69
70        let store = self.store.clone();
71
72        let topic: String = call.req(engine_state, stack, 0)?;
73
74        // Base metadata is injected per instance via $env; absent or malformed
75        // resolves to an empty object.
76        let mut base_obj = stack
77            .get_env_var(engine_state, APPEND_META_ENV)
78            .and_then(|v| v.coerce_string().ok())
79            .and_then(|s| serde_json::from_str::<JsonValue>(&s).ok())
80            .and_then(|j| match j {
81                JsonValue::Object(m) => Some(m),
82                _ => None,
83            })
84            .unwrap_or_default();
85
86        // Merge user-supplied metadata on top of the base.
87        let user_meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
88        if let Some(user_value) = user_meta {
89            match util::value_to_json(&user_value) {
90                JsonValue::Object(user_obj) => base_obj.extend(user_obj),
91                _ => {
92                    return Err(ShellError::TypeMismatch {
93                        err_message: "Meta must be a record".to_string(),
94                        span: call.span(),
95                    });
96                }
97            }
98        }
99        let final_meta = JsonValue::Object(base_obj);
100
101        let ttl: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
102        let ttl = match ttl {
103            Some(ttl_str) => Some(TTL::from_query(Some(&format!("ttl={ttl_str}"))).map_err(
104                |e| ShellError::TypeMismatch {
105                    err_message: format!("Invalid TTL value: {ttl_str}. {e}"),
106                    span: call.span(),
107                },
108            )?),
109            None => None,
110        };
111
112        let hash = util::write_pipeline_to_cas(input, &store, span).map_err(|boxed| *boxed)?;
113
114        let frame = store.append(
115            Frame::builder(topic)
116                .maybe_hash(hash)
117                .meta(final_meta)
118                .maybe_ttl(ttl)
119                .build(),
120        )?;
121
122        Ok(PipelineData::Value(
123            util::frame_to_value(&frame, span, with_timestamp),
124            None,
125        ))
126    }
127}