xs/nu/commands/
append_command_buffered.rs

1use std::sync::{Arc, Mutex};
2
3use nu_engine::CallExt;
4use nu_protocol::engine::{Call, Command, EngineState, Stack};
5use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
6
7use crate::nu::util::value_to_json;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12    output: Arc<Mutex<Vec<Frame>>>,
13    store: Store,
14}
15
16impl AppendCommand {
17    pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
18        Self { output, store }
19    }
20}
21
22impl Command for AppendCommand {
23    fn name(&self) -> &str {
24        ".append"
25    }
26
27    fn signature(&self) -> Signature {
28        Signature::build(".append")
29           .input_output_types(vec![(Type::Any, Type::Any)])
30           .required("topic", SyntaxShape::String, "this clip's topic")
31           .named(
32               "meta",
33               SyntaxShape::Record(vec![]),
34               "arbitrary metadata",
35               None,
36           )
37           .named(
38               "ttl",
39               SyntaxShape::String,
40               r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'"#,
41               None,
42           )
43           .named(
44               "context",
45               SyntaxShape::String,
46               "context ID (defaults to system context)",
47               None,
48           )
49           .category(Category::Experimental)
50    }
51
52    fn description(&self) -> &str {
53        "Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with handlers that need to batch multiple appends."
54    }
55
56    fn run(
57        &self,
58        engine_state: &EngineState,
59        stack: &mut Stack,
60        call: &Call,
61        input: PipelineData,
62    ) -> Result<PipelineData, ShellError> {
63        let span = call.head;
64
65        let topic: String = call.req(engine_state, stack, 0)?;
66        let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
67        let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
68
69        let ttl = ttl_str
70           .map(|s| TTL::from_query(Some(&format!("ttl={s}"))))
71           .transpose()
72           .map_err(|e| ShellError::GenericError {
73               error: "Invalid TTL format".into(),
74               msg: e.to_string(),
75               span: Some(span),
76               help: Some("TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'".into()),
77               inner: vec![],
78           })?;
79
80        let input_value = input.into_value(span)?;
81
82        let hash = crate::nu::util::write_pipeline_to_cas(
83            PipelineData::Value(input_value.clone(), None),
84            &self.store,
85            span,
86        )
87        .map_err(|boxed| *boxed)?;
88
89        let context_str: Option<String> = call.get_flag(engine_state, stack, "context")?;
90        let context_id = if let Some(ctx) = context_str {
91            ctx.parse::<scru128::Scru128Id>()
92                .map_err(|e| ShellError::GenericError {
93                    error: "Invalid context ID".into(),
94                    msg: e.to_string(),
95                    span: Some(call.head),
96                    help: None,
97                    inner: vec![],
98                })?
99        } else {
100            crate::store::ZERO_CONTEXT
101        };
102
103        let frame = Frame::builder(topic, context_id)
104            .maybe_meta(meta.map(|v| value_to_json(&v)))
105            .maybe_hash(hash)
106            .maybe_ttl(ttl)
107            .build();
108
109        self.output.lock().unwrap().push(frame);
110
111        Ok(PipelineData::Empty)
112    }
113}