xs/nu/commands/
append_command_buffered.rs

1use std::sync::{Arc, Mutex};
2
3use nu_engine::CallExt;
4use nu_protocol::engine::{Call, Command, EngineState, Stack};
5use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
6
7use crate::nu::util::value_to_json;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12    output: Arc<Mutex<Vec<Frame>>>,
13    store: Store,
14}
15
16impl AppendCommand {
17    pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
18        Self { output, store }
19    }
20}
21
22impl Command for AppendCommand {
23    fn name(&self) -> &str {
24        ".append"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".append")
29            .input_output_types(vec![(Type::Any, Type::Any)])
30            .required("topic", SyntaxShape::String, "this clip's topic")
31            .named(
32                "meta",
33                SyntaxShape::Record(vec![]),
34                "arbitrary metadata",
35                None,
36            )
37            .named(
38                "ttl",
39                SyntaxShape::String,
40                r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
41                None,
42            )
43            .category(Category::Experimental)
44    }
45
46    fn description(&self) -> &str {
47        "Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with handlers that need to batch multiple appends."
48    }
49
50    fn run(
51        &self,
52        engine_state: &EngineState,
53        stack: &mut Stack,
54        call: &Call,
55        input: PipelineData,
56    ) -> Result<PipelineData, ShellError> {
57        let span = call.head;
58
59        let topic: String = call.req(engine_state, stack, 0)?;
60        let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
61        let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
62
63        let ttl = ttl_str
64            .map(|s| TTL::from_query(Some(&format!("ttl={s}"))))
65            .transpose()
66            .map_err(|e| ShellError::GenericError {
67                error: "Invalid TTL format".into(),
68                msg: e.to_string(),
69                span: Some(span),
70                help: Some(
71                    "TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"
72                        .into(),
73                ),
74                inner: vec![],
75            })?;
76
77        let input_value = input.into_value(span)?;
78
79        let hash = crate::nu::util::write_pipeline_to_cas(
80            PipelineData::Value(input_value.clone(), None),
81            &self.store,
82            span,
83        )
84        .map_err(|boxed| *boxed)?;
85
86        let frame = Frame::builder(topic)
87            .maybe_meta(meta.map(|v| value_to_json(&v)))
88            .maybe_hash(hash)
89            .maybe_ttl(ttl)
90            .build();
91
92        self.output.lock().unwrap().push(frame);
93
94        Ok(PipelineData::Empty)
95    }
96}