xs/nu/commands/
append_command_buffered.rs1use std::sync::{Arc, Mutex};
2
3use nu_engine::CallExt;
4use nu_protocol::engine::{Call, Command, EngineState, Stack};
5use nu_protocol::shell_error::generic::GenericError;
6use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
7
8use crate::nu::util::value_to_json;
9use crate::store::{Frame, Store, TTL};
10
11#[derive(Clone)]
12pub struct AppendCommand {
13 output: Arc<Mutex<Vec<Frame>>>,
14 store: Store,
15}
16
17impl AppendCommand {
18 pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
19 Self { output, store }
20 }
21}
22
23impl Command for AppendCommand {
24 fn name(&self) -> &str {
25 ".append"
26 }
27
28 fn signature(&self) -> Signature {
29 Signature::build(".append")
30 .input_output_types(vec![(Type::Any, Type::Any)])
31 .required("topic", SyntaxShape::String, "this clip's topic")
32 .named(
33 "meta",
34 SyntaxShape::Record(vec![]),
35 "arbitrary metadata",
36 None,
37 )
38 .named(
39 "ttl",
40 SyntaxShape::String,
41 r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
42 None,
43 )
44 .category(Category::Experimental)
45 }
46
47 fn description(&self) -> &str {
48 "Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with actors that need to batch multiple appends."
49 }
50
51 fn run(
52 &self,
53 engine_state: &EngineState,
54 stack: &mut Stack,
55 call: &Call,
56 input: PipelineData,
57 ) -> Result<PipelineData, ShellError> {
58 let span = call.head;
59
60 let topic: String = call.req(engine_state, stack, 0)?;
61 let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
62 let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
63
64 let ttl = ttl_str
65 .map(|s| TTL::from_query(Some(&format!("ttl={s}"))))
66 .transpose()
67 .map_err(|e| ShellError::Generic(
68 GenericError::new("Invalid TTL format", e.to_string(), span)
69 .with_help("TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"),
70 ))?;
71
72 let input_value = input.into_value(span)?;
73
74 let hash = crate::nu::util::write_pipeline_to_cas(
75 PipelineData::Value(input_value.clone(), None),
76 &self.store,
77 span,
78 )
79 .map_err(|boxed| *boxed)?;
80
81 let frame = Frame::builder(topic)
82 .maybe_meta(meta.map(|v| value_to_json(&v)))
83 .maybe_hash(hash)
84 .maybe_ttl(ttl)
85 .build();
86
87 self.output.lock().unwrap().push(frame);
88
89 Ok(PipelineData::Empty)
90 }
91}