xs/nu/commands/
append_command_buffered.rs1use std::sync::{Arc, Mutex};
2
3use nu_engine::CallExt;
4use nu_protocol::engine::{Call, Command, EngineState, Stack};
5use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
6
7use crate::nu::util::value_to_json;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12 output: Arc<Mutex<Vec<Frame>>>,
13 store: Store,
14}
15
16impl AppendCommand {
17 pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
18 Self { output, store }
19 }
20}
21
22impl Command for AppendCommand {
23 fn name(&self) -> &str {
24 ".append"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".append")
29 .input_output_types(vec![(Type::Any, Type::Any)])
30 .required("topic", SyntaxShape::String, "this clip's topic")
31 .named(
32 "meta",
33 SyntaxShape::Record(vec![]),
34 "arbitrary metadata",
35 None,
36 )
37 .named(
38 "ttl",
39 SyntaxShape::String,
40 r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
41 None,
42 )
43 .category(Category::Experimental)
44 }
45
46 fn description(&self) -> &str {
47 "Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with handlers that need to batch multiple appends."
48 }
49
50 fn run(
51 &self,
52 engine_state: &EngineState,
53 stack: &mut Stack,
54 call: &Call,
55 input: PipelineData,
56 ) -> Result<PipelineData, ShellError> {
57 let span = call.head;
58
59 let topic: String = call.req(engine_state, stack, 0)?;
60 let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
61 let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
62
63 let ttl = ttl_str
64 .map(|s| TTL::from_query(Some(&format!("ttl={s}"))))
65 .transpose()
66 .map_err(|e| ShellError::GenericError {
67 error: "Invalid TTL format".into(),
68 msg: e.to_string(),
69 span: Some(span),
70 help: Some(
71 "TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"
72 .into(),
73 ),
74 inner: vec![],
75 })?;
76
77 let input_value = input.into_value(span)?;
78
79 let hash = crate::nu::util::write_pipeline_to_cas(
80 PipelineData::Value(input_value.clone(), None),
81 &self.store,
82 span,
83 )
84 .map_err(|boxed| *boxed)?;
85
86 let frame = Frame::builder(topic)
87 .maybe_meta(meta.map(|v| value_to_json(&v)))
88 .maybe_hash(hash)
89 .maybe_ttl(ttl)
90 .build();
91
92 self.output.lock().unwrap().push(frame);
93
94 Ok(PipelineData::Empty)
95 }
96}