xs/nu/commands/
append_command.rs1use nu_engine::CallExt;
2use nu_protocol::engine::{Call, Command, EngineState, Stack};
3use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};
4
5use serde_json::Value as JsonValue;
6
7use crate::nu::util;
8use crate::store::{Frame, Store, TTL};
9
10pub const APPEND_META_ENV: &str = "XS_APPEND_META";
15
16#[derive(Clone)]
17pub struct AppendCommand {
18 store: Store,
19}
20
21impl AppendCommand {
22 pub fn new(store: Store) -> Self {
23 Self { store }
24 }
25}
26
27impl Command for AppendCommand {
28 fn name(&self) -> &str {
29 ".append"
30 }
31
32 fn signature(&self) -> Signature {
33 Signature::build(".append")
34 .input_output_types(vec![(Type::Any, Type::Any)])
35 .required("topic", SyntaxShape::String, "this clip's topic")
36 .named(
37 "meta",
38 SyntaxShape::Record(vec![]),
39 "arbitrary metadata",
40 None,
41 )
42 .named(
43 "ttl",
44 SyntaxShape::String,
45 r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'last:<n>'"#,
46 None,
47 )
48 .switch(
49 "with-timestamp",
50 "include timestamp extracted from frame ID",
51 None,
52 )
53 .category(Category::Experimental)
54 }
55
56 fn description(&self) -> &str {
57 "Writes its input to the CAS and then appends a frame with a hash of this content to the given topic on the stream."
58 }
59
60 fn run(
61 &self,
62 engine_state: &EngineState,
63 stack: &mut Stack,
64 call: &Call,
65 input: PipelineData,
66 ) -> Result<PipelineData, ShellError> {
67 let span = call.head;
68 let with_timestamp = call.has_flag(engine_state, stack, "with-timestamp")?;
69
70 let store = self.store.clone();
71
72 let topic: String = call.req(engine_state, stack, 0)?;
73
74 let mut base_obj = stack
77 .get_env_var(engine_state, APPEND_META_ENV)
78 .and_then(|v| v.coerce_string().ok())
79 .and_then(|s| serde_json::from_str::<JsonValue>(&s).ok())
80 .and_then(|j| match j {
81 JsonValue::Object(m) => Some(m),
82 _ => None,
83 })
84 .unwrap_or_default();
85
86 let user_meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
88 if let Some(user_value) = user_meta {
89 match util::value_to_json(&user_value) {
90 JsonValue::Object(user_obj) => base_obj.extend(user_obj),
91 _ => {
92 return Err(ShellError::TypeMismatch {
93 err_message: "Meta must be a record".to_string(),
94 span: call.span(),
95 });
96 }
97 }
98 }
99 let final_meta = JsonValue::Object(base_obj);
100
101 let ttl: Option<String> = call.get_flag(engine_state, stack, "ttl")?;
102 let ttl = match ttl {
103 Some(ttl_str) => Some(TTL::from_query(Some(&format!("ttl={ttl_str}"))).map_err(
104 |e| ShellError::TypeMismatch {
105 err_message: format!("Invalid TTL value: {ttl_str}. {e}"),
106 span: call.span(),
107 },
108 )?),
109 None => None,
110 };
111
112 let hash = util::write_pipeline_to_cas(input, &store, span).map_err(|boxed| *boxed)?;
113
114 let frame = store.append(
115 Frame::builder(topic)
116 .maybe_hash(hash)
117 .meta(final_meta)
118 .maybe_ttl(ttl)
119 .build(),
120 )?;
121
122 Ok(PipelineData::Value(
123 util::frame_to_value(&frame, span, with_timestamp),
124 None,
125 ))
126 }
127}