xs/nu/commands/
append_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
4
5use serde_json::Value as JsonValue;
6
7use crate::nu::util;
8use crate::store::{Frame, Store, TTL};
9
10#[derive(Clone)]
11pub struct AppendCommand {
12 store: Store,
13 base_meta: JsonValue,
14}
15
16impl AppendCommand {
17 pub fn new(store: Store, base_meta: JsonValue) -> Self {
18 Self { store, base_meta }
19 }
20}
21
22impl Command for AppendCommand {
23 fn name(&self) -> &str {
24 ".append"
25 }
26
27 fn signature(&self) -> Signature {
28 Signature::build(".append")
29 .input_output_types(vec![(Type::Any, Type::Any)])
30 .required("topic", SyntaxShape::String, "this clip's topic")
31 .named(
32 "meta",
33 SyntaxShape::Record(vec![]),
34 "arbitrary metadata",
35 None,
36 )
37 .named(
38 "ttl",
39 SyntaxShape::String,
40 r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
41 None,
42 )
43 .switch(
44 "with-timestamp",
45 "include timestamp extracted from frame ID",
46 None,
47 )
48 .category(Category::Experimental)
49 }
50
51 fn description(&self) -> &str {
52 "Writes its input to the CAS and then appends a frame with a hash of this content to the given topic on the stream."
53 }
54
55 fn run(
56 &self,
57 engine_state: &EngineState,
58 stack: &mut Stack,
59 call: &Call,
60 input: PipelineData,
61 ) -> Result<PipelineData, ShellError> {
62 let span = call.head;
63 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
64
65 let store = self.store.clone();
66
67 let topic: String = call.req(engine_state, stack, 0)?;
68
69 let user_meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
71 let mut final_meta = self.base_meta.clone(); if let Some(user_value) = user_meta {
75 let user_json = util::value_to_json(&user_value);
76 if let JsonValue::Object(mut base_obj) = final_meta {
77 if let JsonValue::Object(user_obj) = user_json {
78 base_obj.extend(user_obj); final_meta = JsonValue::Object(base_obj);
80 } else {
81 return Err(ShellError::TypeMismatch {
82 err_message: "Meta must be a record".to_string(),
83 span: call.span(),
84 });
85 }
86 }
87 }
88
89 let ttl: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
90 let ttl = match ttl {
91 Some(ttl_str) => Some(TTL::from_query(Some(&format!("ttl={ttl_str}"))).map_err(
92 |e| ShellError::TypeMismatch {
93 err_message: format!("Invalid TTL value: {ttl_str}. {e}"),
94 span: call.span(),
95 },
96 )?),
97 None => None,
98 };
99
100 let hash = util::write_pipeline_to_cas(input, &store, span).map_err(|boxed| *boxed)?;
101
102 let frame = store.append(
103 Frame::builder(topic)
104 .maybe_hash(hash)
105 .meta(final_meta)
106 .maybe_ttl(ttl)
107 .build(),
108 )?;
109
110 Ok(PipelineData::Value(
111 util::frame_to_value(&frame, span, with_timestamp),
112 None,
113 ))
114 }
115}