http_nu/
commands.rs

1use crate::response::{Response, ResponseBodyType};
2use nu_engine::command_prelude::*;
3use nu_protocol::{
4    ByteStream, ByteStreamType, Category, Config, PipelineData, PipelineMetadata, ShellError,
5    Signature, Span, SyntaxShape, Type, Value,
6};
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::io::Read;
10use std::path::PathBuf;
11use tokio::sync::oneshot;
12
13use minijinja::Environment;
14
15thread_local! {
16    pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
17}
18
19#[derive(Clone)]
20pub struct ResponseStartCommand;
21
22impl Default for ResponseStartCommand {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl ResponseStartCommand {
29    pub fn new() -> Self {
30        Self
31    }
32}
33
34impl Command for ResponseStartCommand {
35    fn name(&self) -> &str {
36        ".response"
37    }
38
39    fn description(&self) -> &str {
40        "Start an HTTP response with status and headers"
41    }
42
43    fn signature(&self) -> Signature {
44        Signature::build(".response")
45            .required(
46                "meta",
47                SyntaxShape::Record(vec![]), // Add empty vec argument
48                "response configuration with optional status and headers",
49            )
50            .input_output_types(vec![(Type::Nothing, Type::Nothing)])
51            .category(Category::Custom("http".into()))
52    }
53
54    fn run(
55        &self,
56        engine_state: &EngineState,
57        stack: &mut Stack,
58        call: &Call,
59        _input: PipelineData,
60    ) -> Result<PipelineData, ShellError> {
61        let meta: Value = call.req(engine_state, stack, 0)?;
62        let record = meta.as_record()?;
63
64        // Extract optional status, default to 200
65        let status = match record.get("status") {
66            Some(status_value) => status_value.as_int()? as u16,
67            None => 200,
68        };
69
70        // Extract headers
71        let headers = match record.get("headers") {
72            Some(headers_value) => {
73                let headers_record = headers_value.as_record()?;
74                let mut map = HashMap::new();
75                for (k, v) in headers_record.iter() {
76                    let header_value = match v {
77                        Value::String { val, .. } => {
78                            crate::response::HeaderValue::Single(val.clone())
79                        }
80                        Value::List { vals, .. } => {
81                            let strings: Vec<String> = vals
82                                .iter()
83                                .filter_map(|v| v.as_str().ok())
84                                .map(|s| s.to_string())
85                                .collect();
86                            crate::response::HeaderValue::Multiple(strings)
87                        }
88                        _ => {
89                            return Err(nu_protocol::ShellError::CantConvert {
90                                to_type: "string or list<string>".to_string(),
91                                from_type: v.get_type().to_string(),
92                                span: v.span(),
93                                help: Some(
94                                    "header values must be strings or lists of strings".to_string(),
95                                ),
96                            });
97                        }
98                    };
99                    map.insert(k.clone(), header_value);
100                }
101                map
102            }
103            None => HashMap::new(),
104        };
105
106        // Create response and send through channel
107        let response = Response {
108            status,
109            headers,
110            body_type: ResponseBodyType::Normal,
111        };
112
113        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
114            if let Some(tx) = tx.borrow_mut().take() {
115                tx.send(response).map_err(|_| ShellError::GenericError {
116                    error: "Failed to send response".into(),
117                    msg: "Channel closed".into(),
118                    span: Some(call.head),
119                    help: None,
120                    inner: vec![],
121                })?;
122            }
123            Ok(())
124        })?;
125
126        Ok(PipelineData::Empty)
127    }
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl StaticCommand {
140    pub fn new() -> Self {
141        Self
142    }
143}
144
145impl Command for StaticCommand {
146    fn name(&self) -> &str {
147        ".static"
148    }
149
150    fn description(&self) -> &str {
151        "Serve static files from a directory"
152    }
153
154    fn signature(&self) -> Signature {
155        Signature::build(".static")
156            .required("root", SyntaxShape::String, "root directory path")
157            .required("path", SyntaxShape::String, "request path")
158            .named(
159                "fallback",
160                SyntaxShape::String,
161                "fallback file when request missing",
162                None,
163            )
164            .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165            .category(Category::Custom("http".into()))
166    }
167
168    fn run(
169        &self,
170        engine_state: &EngineState,
171        stack: &mut Stack,
172        call: &Call,
173        _input: PipelineData,
174    ) -> Result<PipelineData, ShellError> {
175        let root: String = call.req(engine_state, stack, 0)?;
176        let path: String = call.req(engine_state, stack, 1)?;
177
178        let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180        let response = Response {
181            status: 200,
182            headers: HashMap::new(),
183            body_type: ResponseBodyType::Static {
184                root: PathBuf::from(root),
185                path,
186                fallback,
187            },
188        };
189
190        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191            if let Some(tx) = tx.borrow_mut().take() {
192                tx.send(response).map_err(|_| ShellError::GenericError {
193                    error: "Failed to send response".into(),
194                    msg: "Channel closed".into(),
195                    span: Some(call.head),
196                    help: None,
197                    inner: vec![],
198                })?;
199            }
200            Ok(())
201        })?;
202
203        Ok(PipelineData::Empty)
204    }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213    fn name(&self) -> &str {
214        "to sse"
215    }
216
217    fn signature(&self) -> Signature {
218        Signature::build("to sse")
219            .input_output_types(vec![(Type::record(), Type::String)])
220            .category(Category::Formats)
221    }
222
223    fn description(&self) -> &str {
224        "Convert records into text/event-stream format"
225    }
226
227    fn search_terms(&self) -> Vec<&str> {
228        vec!["sse", "server", "event"]
229    }
230
231    fn examples(&self) -> Vec<Example<'_>> {
232        vec![Example {
233            description: "Convert a record into a server-sent event",
234            example: "{data: 'hello'} | to sse",
235            result: Some(Value::test_string("data: hello\n\n")),
236        }]
237    }
238
239    fn run(
240        &self,
241        engine_state: &EngineState,
242        stack: &mut Stack,
243        call: &Call,
244        input: PipelineData,
245    ) -> Result<PipelineData, ShellError> {
246        let head = call.head;
247        let config = stack.get_config(engine_state);
248        match input {
249            PipelineData::ListStream(stream, meta) => {
250                let span = stream.span();
251                let cfg = config.clone();
252                let iter = stream
253                    .into_iter()
254                    .map(move |val| event_to_string(&cfg, val));
255                let stream = ByteStream::from_result_iter(
256                    iter,
257                    span,
258                    engine_state.signals().clone(),
259                    ByteStreamType::String,
260                );
261                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
262            }
263            PipelineData::Value(Value::List { vals, .. }, meta) => {
264                let cfg = config.clone();
265                let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
266                let span = head;
267                let stream = ByteStream::from_result_iter(
268                    iter,
269                    span,
270                    engine_state.signals().clone(),
271                    ByteStreamType::String,
272                );
273                Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
274            }
275            PipelineData::Value(val, meta) => {
276                let out = event_to_string(&config, val)?;
277                Ok(
278                    Value::string(out, head)
279                        .into_pipeline_data_with_metadata(update_metadata(meta)),
280                )
281            }
282            PipelineData::Empty => Ok(PipelineData::Value(
283                Value::string(String::new(), head),
284                update_metadata(None),
285            )),
286            PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
287                err_message: "expected record input".into(),
288                span: head,
289            }),
290        }
291    }
292}
293
294#[allow(clippy::result_large_err)]
295fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
296    let span = val.span();
297    let rec = match val {
298        Value::Record { val, .. } => val,
299        other => {
300            return Err(ShellError::TypeMismatch {
301                err_message: format!("expected record, got {}", other.get_type()),
302                span,
303            })
304        }
305    };
306    let mut out = String::new();
307    if let Some(id) = rec.get("id") {
308        out.push_str("id: ");
309        out.push_str(&id.to_expanded_string("", config));
310        out.push_str(LINE_ENDING);
311    }
312    if let Some(event) = rec.get("event") {
313        out.push_str("event: ");
314        out.push_str(&event.to_expanded_string("", config));
315        out.push_str(LINE_ENDING);
316    }
317    if let Some(data) = rec.get("data") {
318        let data_str = match data {
319            Value::String { val, .. } => val.clone(),
320            _ => {
321                let json_value =
322                    value_to_json(data, config).map_err(|err| ShellError::GenericError {
323                        error: err.to_string(),
324                        msg: "failed to serialize json".into(),
325                        span: Some(Span::unknown()),
326                        help: None,
327                        inner: vec![],
328                    })?;
329                serde_json::to_string(&json_value).map_err(|err| ShellError::GenericError {
330                    error: err.to_string(),
331                    msg: "failed to serialize json".into(),
332                    span: Some(Span::unknown()),
333                    help: None,
334                    inner: vec![],
335                })?
336            }
337        };
338        for line in data_str.lines() {
339            out.push_str("data: ");
340            out.push_str(line);
341            out.push_str(LINE_ENDING);
342        }
343    }
344    out.push_str(LINE_ENDING);
345    Ok(out)
346}
347
348fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
349    Ok(match val {
350        Value::Bool { val, .. } => serde_json::Value::Bool(*val),
351        Value::Int { val, .. } => serde_json::Value::from(*val),
352        Value::Float { val, .. } => serde_json::Number::from_f64(*val)
353            .map(serde_json::Value::Number)
354            .unwrap_or(serde_json::Value::Null),
355        Value::String { val, .. } => serde_json::Value::String(val.clone()),
356        Value::List { vals, .. } => serde_json::Value::Array(
357            vals.iter()
358                .map(|v| value_to_json(v, config))
359                .collect::<Result<Vec<_>, _>>()?,
360        ),
361        Value::Record { val, .. } => {
362            let mut map = serde_json::Map::new();
363            for (k, v) in val.iter() {
364                map.insert(k.clone(), value_to_json(v, config)?);
365            }
366            serde_json::Value::Object(map)
367        }
368        Value::Nothing { .. } => serde_json::Value::Null,
369        other => serde_json::Value::String(other.to_expanded_string("", config)),
370    })
371}
372
373fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
374    metadata
375        .map(|md| md.with_content_type(Some("text/event-stream".into())))
376        .or_else(|| {
377            Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
378        })
379}
380
381#[derive(Clone)]
382pub struct ReverseProxyCommand;
383
384impl Default for ReverseProxyCommand {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390impl ReverseProxyCommand {
391    pub fn new() -> Self {
392        Self
393    }
394}
395
396impl Command for ReverseProxyCommand {
397    fn name(&self) -> &str {
398        ".reverse-proxy"
399    }
400
401    fn description(&self) -> &str {
402        "Forward HTTP requests to a backend server"
403    }
404
405    fn signature(&self) -> Signature {
406        Signature::build(".reverse-proxy")
407            .required("target_url", SyntaxShape::String, "backend URL to proxy to")
408            .optional(
409                "config",
410                SyntaxShape::Record(vec![]),
411                "optional configuration (headers, preserve_host, strip_prefix, query)",
412            )
413            .input_output_types(vec![(Type::Any, Type::Nothing)])
414            .category(Category::Custom("http".into()))
415    }
416
417    fn run(
418        &self,
419        engine_state: &EngineState,
420        stack: &mut Stack,
421        call: &Call,
422        input: PipelineData,
423    ) -> Result<PipelineData, ShellError> {
424        let target_url: String = call.req(engine_state, stack, 0)?;
425
426        // Convert input pipeline data to bytes for request body
427        let request_body = match input {
428            PipelineData::Empty => Vec::new(),
429            PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
430            PipelineData::ByteStream(stream, _) => {
431                // Collect all bytes from the stream
432                let mut body_bytes = Vec::new();
433                if let Some(mut reader) = stream.reader() {
434                    loop {
435                        let mut buffer = vec![0; 8192];
436                        match reader.read(&mut buffer) {
437                            Ok(0) => break, // EOF
438                            Ok(n) => {
439                                buffer.truncate(n);
440                                body_bytes.extend_from_slice(&buffer);
441                            }
442                            Err(_) => break,
443                        }
444                    }
445                }
446                body_bytes
447            }
448            PipelineData::ListStream(stream, _) => {
449                // Convert list stream to JSON array
450                let items: Vec<_> = stream.into_iter().collect();
451                let json_value = serde_json::Value::Array(
452                    items
453                        .into_iter()
454                        .map(|v| crate::response::value_to_json(&v))
455                        .collect(),
456                );
457                serde_json::to_string(&json_value)
458                    .unwrap_or_default()
459                    .into_bytes()
460            }
461        };
462
463        // Parse optional config
464        let config = call.opt::<Value>(engine_state, stack, 1);
465
466        let mut headers = HashMap::new();
467        let mut preserve_host = true;
468        let mut strip_prefix: Option<String> = None;
469        let mut query: Option<HashMap<String, String>> = None;
470
471        if let Ok(Some(config_value)) = config {
472            if let Ok(record) = config_value.as_record() {
473                // Extract headers
474                if let Some(headers_value) = record.get("headers") {
475                    if let Ok(headers_record) = headers_value.as_record() {
476                        for (k, v) in headers_record.iter() {
477                            let header_value = match v {
478                                Value::String { val, .. } => {
479                                    crate::response::HeaderValue::Single(val.clone())
480                                }
481                                Value::List { vals, .. } => {
482                                    let strings: Vec<String> = vals
483                                        .iter()
484                                        .filter_map(|v| v.as_str().ok())
485                                        .map(|s| s.to_string())
486                                        .collect();
487                                    crate::response::HeaderValue::Multiple(strings)
488                                }
489                                _ => continue, // Skip non-string/non-list values
490                            };
491                            headers.insert(k.clone(), header_value);
492                        }
493                    }
494                }
495
496                // Extract preserve_host
497                if let Some(preserve_host_value) = record.get("preserve_host") {
498                    if let Ok(ph) = preserve_host_value.as_bool() {
499                        preserve_host = ph;
500                    }
501                }
502
503                // Extract strip_prefix
504                if let Some(strip_prefix_value) = record.get("strip_prefix") {
505                    if let Ok(prefix) = strip_prefix_value.as_str() {
506                        strip_prefix = Some(prefix.to_string());
507                    }
508                }
509
510                // Extract query
511                if let Some(query_value) = record.get("query") {
512                    if let Ok(query_record) = query_value.as_record() {
513                        let mut query_map = HashMap::new();
514                        for (k, v) in query_record.iter() {
515                            if let Ok(v_str) = v.as_str() {
516                                query_map.insert(k.clone(), v_str.to_string());
517                            }
518                        }
519                        query = Some(query_map);
520                    }
521                }
522            }
523        }
524
525        let response = Response {
526            status: 200,
527            headers: HashMap::new(),
528            body_type: ResponseBodyType::ReverseProxy {
529                target_url,
530                headers,
531                preserve_host,
532                strip_prefix,
533                request_body,
534                query,
535            },
536        };
537
538        RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
539            if let Some(tx) = tx.borrow_mut().take() {
540                tx.send(response).map_err(|_| ShellError::GenericError {
541                    error: "Failed to send response".into(),
542                    msg: "Channel closed".into(),
543                    span: Some(call.head),
544                    help: None,
545                    inner: vec![],
546                })?;
547            }
548            Ok(())
549        })?;
550
551        Ok(PipelineData::Empty)
552    }
553}
554
555#[derive(Clone)]
556pub struct MjCommand;
557
558impl Default for MjCommand {
559    fn default() -> Self {
560        Self::new()
561    }
562}
563
564impl MjCommand {
565    pub fn new() -> Self {
566        Self
567    }
568}
569
570impl Command for MjCommand {
571    fn name(&self) -> &str {
572        ".mj"
573    }
574
575    fn description(&self) -> &str {
576        "Render a minijinja template with context from input"
577    }
578
579    fn signature(&self) -> Signature {
580        Signature::build(".mj")
581            .optional("file", SyntaxShape::String, "template file path")
582            .named(
583                "inline",
584                SyntaxShape::String,
585                "inline template string",
586                Some('i'),
587            )
588            .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
589            .category(Category::Custom("http".into()))
590    }
591
592    fn run(
593        &self,
594        engine_state: &EngineState,
595        stack: &mut Stack,
596        call: &Call,
597        input: PipelineData,
598    ) -> Result<PipelineData, ShellError> {
599        let head = call.head;
600        let file: Option<String> = call.opt(engine_state, stack, 0)?;
601        let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
602
603        // Get template source
604        let template_source = match (&file, &inline) {
605            (Some(_), Some(_)) => {
606                return Err(ShellError::GenericError {
607                    error: "Cannot specify both file and --inline".into(),
608                    msg: "use either a file path or --inline, not both".into(),
609                    span: Some(head),
610                    help: None,
611                    inner: vec![],
612                });
613            }
614            (None, None) => {
615                return Err(ShellError::GenericError {
616                    error: "No template specified".into(),
617                    msg: "provide a file path or use --inline".into(),
618                    span: Some(head),
619                    help: None,
620                    inner: vec![],
621                });
622            }
623            (Some(path), None) => {
624                std::fs::read_to_string(path).map_err(|e| ShellError::GenericError {
625                    error: format!("Failed to read template file: {e}"),
626                    msg: "could not read file".into(),
627                    span: Some(head),
628                    help: None,
629                    inner: vec![],
630                })?
631            }
632            (None, Some(tmpl)) => tmpl.clone(),
633        };
634
635        // Get context from input
636        let context = match input {
637            PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
638            PipelineData::Empty => minijinja::Value::from(()),
639            _ => {
640                return Err(ShellError::TypeMismatch {
641                    err_message: "expected record input".into(),
642                    span: head,
643                });
644            }
645        };
646
647        // Render template
648        let mut env = Environment::new();
649        env.add_template("template", &template_source)
650            .map_err(|e| ShellError::GenericError {
651                error: format!("Template parse error: {e}"),
652                msg: e.to_string(),
653                span: Some(head),
654                help: None,
655                inner: vec![],
656            })?;
657
658        let tmpl = env
659            .get_template("template")
660            .map_err(|e| ShellError::GenericError {
661                error: format!("Failed to get template: {e}"),
662                msg: e.to_string(),
663                span: Some(head),
664                help: None,
665                inner: vec![],
666            })?;
667
668        let rendered = tmpl
669            .render(&context)
670            .map_err(|e| ShellError::GenericError {
671                error: format!("Template render error: {e}"),
672                msg: e.to_string(),
673                span: Some(head),
674                help: None,
675                inner: vec![],
676            })?;
677
678        Ok(Value::string(rendered, head).into_pipeline_data())
679    }
680}
681
682/// Convert a nu_protocol::Value to a minijinja::Value via serde_json
683fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
684    let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
685    minijinja::Value::from_serialize(&json)
686}