Skip to main content

xs/nu/commands/
append_command_buffered.rs

1use std::sync::{Arc, Mutex};
2
3use nu_engine::CallExt;
4use nu_protocol::engine::{Call, Command, EngineState, Stack};
5use nu_protocol::shell_error::generic::GenericError;
6use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
7
8use crate::nu::util::value_to_json;
9use crate::store::{Frame, Store, TTL};
10
11#[derive(Clone)]
12pub struct AppendCommand {
13    output: Arc<Mutex<Vec<Frame>>>,
14    store: Store,
15}
16
17impl AppendCommand {
18    pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
19        Self { output, store }
20    }
21}
22
23impl Command for AppendCommand {
24    fn name(&self) -> &str {
25        ".append"
26    }
27
28    fn signature(&self) -> Signature {
29        Signature::build(".append")
30            .input_output_types(vec![(Type::Any, Type::Any)])
31            .required("topic", SyntaxShape::String, "this clip's topic")
32            .named(
33                "meta",
34                SyntaxShape::Record(vec![]),
35                "arbitrary metadata",
36                None,
37            )
38            .named(
39                "ttl",
40                SyntaxShape::String,
41                r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
42                None,
43            )
44            .category(Category::Experimental)
45    }
46
47    fn description(&self) -> &str {
48        "Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with actors that need to batch multiple appends."
49    }
50
51    fn run(
52        &self,
53        engine_state: &EngineState,
54        stack: &mut Stack,
55        call: &Call,
56        input: PipelineData,
57    ) -> Result<PipelineData, ShellError> {
58        let span = call.head;
59
60        let topic: String = call.req(engine_state, stack, 0)?;
61        let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
62        let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
63
64        let ttl = ttl_str
65            .map(|s| TTL::from_query(Some(&format!("ttl={s}"))))
66            .transpose()
67            .map_err(|e| ShellError::Generic(
68                GenericError::new("Invalid TTL format", e.to_string(), span)
69                    .with_help("TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"),
70            ))?;
71
72        let input_value = input.into_value(span)?;
73
74        let hash = crate::nu::util::write_pipeline_to_cas(
75            PipelineData::Value(input_value.clone(), None),
76            &self.store,
77            span,
78        )
79        .map_err(|boxed| *boxed)?;
80
81        let frame = Frame::builder(topic)
82            .maybe_meta(meta.map(|v| value_to_json(&v)))
83            .maybe_hash(hash)
84            .maybe_ttl(ttl)
85            .build();
86
87        self.output.lock().unwrap().push(frame);
88
89        Ok(PipelineData::Empty)
90    }
91}