1use crate::response::{Response, ResponseBodyType};
2use nu_engine::command_prelude::*;
3use nu_protocol::{
4 ByteStream, ByteStreamType, Category, Config, PipelineData, PipelineMetadata, ShellError,
5 Signature, Span, SyntaxShape, Type, Value,
6};
7use std::cell::RefCell;
8use std::collections::HashMap;
9use std::io::Read;
10use std::path::PathBuf;
11use tokio::sync::oneshot;
12
13use minijinja::Environment;
14
15thread_local! {
16 pub static RESPONSE_TX: RefCell<Option<oneshot::Sender<Response>>> = const { RefCell::new(None) };
17}
18
19#[derive(Clone)]
20pub struct ResponseStartCommand;
21
22impl Default for ResponseStartCommand {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl ResponseStartCommand {
29 pub fn new() -> Self {
30 Self
31 }
32}
33
34impl Command for ResponseStartCommand {
35 fn name(&self) -> &str {
36 ".response"
37 }
38
39 fn description(&self) -> &str {
40 "Start an HTTP response with status and headers"
41 }
42
43 fn signature(&self) -> Signature {
44 Signature::build(".response")
45 .required(
46 "meta",
47 SyntaxShape::Record(vec![]), "response configuration with optional status and headers",
49 )
50 .input_output_types(vec![(Type::Nothing, Type::Nothing)])
51 .category(Category::Custom("http".into()))
52 }
53
54 fn run(
55 &self,
56 engine_state: &EngineState,
57 stack: &mut Stack,
58 call: &Call,
59 _input: PipelineData,
60 ) -> Result<PipelineData, ShellError> {
61 let meta: Value = call.req(engine_state, stack, 0)?;
62 let record = meta.as_record()?;
63
64 let status = match record.get("status") {
66 Some(status_value) => status_value.as_int()? as u16,
67 None => 200,
68 };
69
70 let headers = match record.get("headers") {
72 Some(headers_value) => {
73 let headers_record = headers_value.as_record()?;
74 let mut map = HashMap::new();
75 for (k, v) in headers_record.iter() {
76 let header_value = match v {
77 Value::String { val, .. } => {
78 crate::response::HeaderValue::Single(val.clone())
79 }
80 Value::List { vals, .. } => {
81 let strings: Vec<String> = vals
82 .iter()
83 .filter_map(|v| v.as_str().ok())
84 .map(|s| s.to_string())
85 .collect();
86 crate::response::HeaderValue::Multiple(strings)
87 }
88 _ => {
89 return Err(nu_protocol::ShellError::CantConvert {
90 to_type: "string or list<string>".to_string(),
91 from_type: v.get_type().to_string(),
92 span: v.span(),
93 help: Some(
94 "header values must be strings or lists of strings".to_string(),
95 ),
96 });
97 }
98 };
99 map.insert(k.clone(), header_value);
100 }
101 map
102 }
103 None => HashMap::new(),
104 };
105
106 let response = Response {
108 status,
109 headers,
110 body_type: ResponseBodyType::Normal,
111 };
112
113 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
114 if let Some(tx) = tx.borrow_mut().take() {
115 tx.send(response).map_err(|_| ShellError::GenericError {
116 error: "Failed to send response".into(),
117 msg: "Channel closed".into(),
118 span: Some(call.head),
119 help: None,
120 inner: vec![],
121 })?;
122 }
123 Ok(())
124 })?;
125
126 Ok(PipelineData::Empty)
127 }
128}
129
130#[derive(Clone)]
131pub struct StaticCommand;
132
133impl Default for StaticCommand {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl StaticCommand {
140 pub fn new() -> Self {
141 Self
142 }
143}
144
145impl Command for StaticCommand {
146 fn name(&self) -> &str {
147 ".static"
148 }
149
150 fn description(&self) -> &str {
151 "Serve static files from a directory"
152 }
153
154 fn signature(&self) -> Signature {
155 Signature::build(".static")
156 .required("root", SyntaxShape::String, "root directory path")
157 .required("path", SyntaxShape::String, "request path")
158 .named(
159 "fallback",
160 SyntaxShape::String,
161 "fallback file when request missing",
162 None,
163 )
164 .input_output_types(vec![(Type::Nothing, Type::Nothing)])
165 .category(Category::Custom("http".into()))
166 }
167
168 fn run(
169 &self,
170 engine_state: &EngineState,
171 stack: &mut Stack,
172 call: &Call,
173 _input: PipelineData,
174 ) -> Result<PipelineData, ShellError> {
175 let root: String = call.req(engine_state, stack, 0)?;
176 let path: String = call.req(engine_state, stack, 1)?;
177
178 let fallback: Option<String> = call.get_flag(engine_state, stack, "fallback")?;
179
180 let response = Response {
181 status: 200,
182 headers: HashMap::new(),
183 body_type: ResponseBodyType::Static {
184 root: PathBuf::from(root),
185 path,
186 fallback,
187 },
188 };
189
190 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
191 if let Some(tx) = tx.borrow_mut().take() {
192 tx.send(response).map_err(|_| ShellError::GenericError {
193 error: "Failed to send response".into(),
194 msg: "Channel closed".into(),
195 span: Some(call.head),
196 help: None,
197 inner: vec![],
198 })?;
199 }
200 Ok(())
201 })?;
202
203 Ok(PipelineData::Empty)
204 }
205}
206
207const LINE_ENDING: &str = "\n";
208
209#[derive(Clone)]
210pub struct ToSse;
211
212impl Command for ToSse {
213 fn name(&self) -> &str {
214 "to sse"
215 }
216
217 fn signature(&self) -> Signature {
218 Signature::build("to sse")
219 .input_output_types(vec![(Type::record(), Type::String)])
220 .category(Category::Formats)
221 }
222
223 fn description(&self) -> &str {
224 "Convert records into text/event-stream format"
225 }
226
227 fn search_terms(&self) -> Vec<&str> {
228 vec!["sse", "server", "event"]
229 }
230
231 fn examples(&self) -> Vec<Example<'_>> {
232 vec![Example {
233 description: "Convert a record into a server-sent event",
234 example: "{data: 'hello'} | to sse",
235 result: Some(Value::test_string("data: hello\n\n")),
236 }]
237 }
238
239 fn run(
240 &self,
241 engine_state: &EngineState,
242 stack: &mut Stack,
243 call: &Call,
244 input: PipelineData,
245 ) -> Result<PipelineData, ShellError> {
246 let head = call.head;
247 let config = stack.get_config(engine_state);
248 match input {
249 PipelineData::ListStream(stream, meta) => {
250 let span = stream.span();
251 let cfg = config.clone();
252 let iter = stream
253 .into_iter()
254 .map(move |val| event_to_string(&cfg, val));
255 let stream = ByteStream::from_result_iter(
256 iter,
257 span,
258 engine_state.signals().clone(),
259 ByteStreamType::String,
260 );
261 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
262 }
263 PipelineData::Value(Value::List { vals, .. }, meta) => {
264 let cfg = config.clone();
265 let iter = vals.into_iter().map(move |val| event_to_string(&cfg, val));
266 let span = head;
267 let stream = ByteStream::from_result_iter(
268 iter,
269 span,
270 engine_state.signals().clone(),
271 ByteStreamType::String,
272 );
273 Ok(PipelineData::ByteStream(stream, update_metadata(meta)))
274 }
275 PipelineData::Value(val, meta) => {
276 let out = event_to_string(&config, val)?;
277 Ok(
278 Value::string(out, head)
279 .into_pipeline_data_with_metadata(update_metadata(meta)),
280 )
281 }
282 PipelineData::Empty => Ok(PipelineData::Value(
283 Value::string(String::new(), head),
284 update_metadata(None),
285 )),
286 PipelineData::ByteStream(..) => Err(ShellError::TypeMismatch {
287 err_message: "expected record input".into(),
288 span: head,
289 }),
290 }
291 }
292}
293
294#[allow(clippy::result_large_err)]
295fn event_to_string(config: &Config, val: Value) -> Result<String, ShellError> {
296 let span = val.span();
297 let rec = match val {
298 Value::Record { val, .. } => val,
299 other => {
300 return Err(ShellError::TypeMismatch {
301 err_message: format!("expected record, got {}", other.get_type()),
302 span,
303 })
304 }
305 };
306 let mut out = String::new();
307 if let Some(id) = rec.get("id") {
308 out.push_str("id: ");
309 out.push_str(&id.to_expanded_string("", config));
310 out.push_str(LINE_ENDING);
311 }
312 if let Some(event) = rec.get("event") {
313 out.push_str("event: ");
314 out.push_str(&event.to_expanded_string("", config));
315 out.push_str(LINE_ENDING);
316 }
317 if let Some(data) = rec.get("data") {
318 let data_str = match data {
319 Value::String { val, .. } => val.clone(),
320 _ => {
321 let json_value =
322 value_to_json(data, config).map_err(|err| ShellError::GenericError {
323 error: err.to_string(),
324 msg: "failed to serialize json".into(),
325 span: Some(Span::unknown()),
326 help: None,
327 inner: vec![],
328 })?;
329 serde_json::to_string(&json_value).map_err(|err| ShellError::GenericError {
330 error: err.to_string(),
331 msg: "failed to serialize json".into(),
332 span: Some(Span::unknown()),
333 help: None,
334 inner: vec![],
335 })?
336 }
337 };
338 for line in data_str.lines() {
339 out.push_str("data: ");
340 out.push_str(line);
341 out.push_str(LINE_ENDING);
342 }
343 }
344 out.push_str(LINE_ENDING);
345 Ok(out)
346}
347
348fn value_to_json(val: &Value, config: &Config) -> serde_json::Result<serde_json::Value> {
349 Ok(match val {
350 Value::Bool { val, .. } => serde_json::Value::Bool(*val),
351 Value::Int { val, .. } => serde_json::Value::from(*val),
352 Value::Float { val, .. } => serde_json::Number::from_f64(*val)
353 .map(serde_json::Value::Number)
354 .unwrap_or(serde_json::Value::Null),
355 Value::String { val, .. } => serde_json::Value::String(val.clone()),
356 Value::List { vals, .. } => serde_json::Value::Array(
357 vals.iter()
358 .map(|v| value_to_json(v, config))
359 .collect::<Result<Vec<_>, _>>()?,
360 ),
361 Value::Record { val, .. } => {
362 let mut map = serde_json::Map::new();
363 for (k, v) in val.iter() {
364 map.insert(k.clone(), value_to_json(v, config)?);
365 }
366 serde_json::Value::Object(map)
367 }
368 Value::Nothing { .. } => serde_json::Value::Null,
369 other => serde_json::Value::String(other.to_expanded_string("", config)),
370 })
371}
372
373fn update_metadata(metadata: Option<PipelineMetadata>) -> Option<PipelineMetadata> {
374 metadata
375 .map(|md| md.with_content_type(Some("text/event-stream".into())))
376 .or_else(|| {
377 Some(PipelineMetadata::default().with_content_type(Some("text/event-stream".into())))
378 })
379}
380
381#[derive(Clone)]
382pub struct ReverseProxyCommand;
383
384impl Default for ReverseProxyCommand {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390impl ReverseProxyCommand {
391 pub fn new() -> Self {
392 Self
393 }
394}
395
396impl Command for ReverseProxyCommand {
397 fn name(&self) -> &str {
398 ".reverse-proxy"
399 }
400
401 fn description(&self) -> &str {
402 "Forward HTTP requests to a backend server"
403 }
404
405 fn signature(&self) -> Signature {
406 Signature::build(".reverse-proxy")
407 .required("target_url", SyntaxShape::String, "backend URL to proxy to")
408 .optional(
409 "config",
410 SyntaxShape::Record(vec![]),
411 "optional configuration (headers, preserve_host, strip_prefix, query)",
412 )
413 .input_output_types(vec![(Type::Any, Type::Nothing)])
414 .category(Category::Custom("http".into()))
415 }
416
417 fn run(
418 &self,
419 engine_state: &EngineState,
420 stack: &mut Stack,
421 call: &Call,
422 input: PipelineData,
423 ) -> Result<PipelineData, ShellError> {
424 let target_url: String = call.req(engine_state, stack, 0)?;
425
426 let request_body = match input {
428 PipelineData::Empty => Vec::new(),
429 PipelineData::Value(value, _) => crate::response::value_to_bytes(value),
430 PipelineData::ByteStream(stream, _) => {
431 let mut body_bytes = Vec::new();
433 if let Some(mut reader) = stream.reader() {
434 loop {
435 let mut buffer = vec![0; 8192];
436 match reader.read(&mut buffer) {
437 Ok(0) => break, Ok(n) => {
439 buffer.truncate(n);
440 body_bytes.extend_from_slice(&buffer);
441 }
442 Err(_) => break,
443 }
444 }
445 }
446 body_bytes
447 }
448 PipelineData::ListStream(stream, _) => {
449 let items: Vec<_> = stream.into_iter().collect();
451 let json_value = serde_json::Value::Array(
452 items
453 .into_iter()
454 .map(|v| crate::response::value_to_json(&v))
455 .collect(),
456 );
457 serde_json::to_string(&json_value)
458 .unwrap_or_default()
459 .into_bytes()
460 }
461 };
462
463 let config = call.opt::<Value>(engine_state, stack, 1);
465
466 let mut headers = HashMap::new();
467 let mut preserve_host = true;
468 let mut strip_prefix: Option<String> = None;
469 let mut query: Option<HashMap<String, String>> = None;
470
471 if let Ok(Some(config_value)) = config {
472 if let Ok(record) = config_value.as_record() {
473 if let Some(headers_value) = record.get("headers") {
475 if let Ok(headers_record) = headers_value.as_record() {
476 for (k, v) in headers_record.iter() {
477 let header_value = match v {
478 Value::String { val, .. } => {
479 crate::response::HeaderValue::Single(val.clone())
480 }
481 Value::List { vals, .. } => {
482 let strings: Vec<String> = vals
483 .iter()
484 .filter_map(|v| v.as_str().ok())
485 .map(|s| s.to_string())
486 .collect();
487 crate::response::HeaderValue::Multiple(strings)
488 }
489 _ => continue, };
491 headers.insert(k.clone(), header_value);
492 }
493 }
494 }
495
496 if let Some(preserve_host_value) = record.get("preserve_host") {
498 if let Ok(ph) = preserve_host_value.as_bool() {
499 preserve_host = ph;
500 }
501 }
502
503 if let Some(strip_prefix_value) = record.get("strip_prefix") {
505 if let Ok(prefix) = strip_prefix_value.as_str() {
506 strip_prefix = Some(prefix.to_string());
507 }
508 }
509
510 if let Some(query_value) = record.get("query") {
512 if let Ok(query_record) = query_value.as_record() {
513 let mut query_map = HashMap::new();
514 for (k, v) in query_record.iter() {
515 if let Ok(v_str) = v.as_str() {
516 query_map.insert(k.clone(), v_str.to_string());
517 }
518 }
519 query = Some(query_map);
520 }
521 }
522 }
523 }
524
525 let response = Response {
526 status: 200,
527 headers: HashMap::new(),
528 body_type: ResponseBodyType::ReverseProxy {
529 target_url,
530 headers,
531 preserve_host,
532 strip_prefix,
533 request_body,
534 query,
535 },
536 };
537
538 RESPONSE_TX.with(|tx| -> Result<_, ShellError> {
539 if let Some(tx) = tx.borrow_mut().take() {
540 tx.send(response).map_err(|_| ShellError::GenericError {
541 error: "Failed to send response".into(),
542 msg: "Channel closed".into(),
543 span: Some(call.head),
544 help: None,
545 inner: vec![],
546 })?;
547 }
548 Ok(())
549 })?;
550
551 Ok(PipelineData::Empty)
552 }
553}
554
555#[derive(Clone)]
556pub struct MjCommand;
557
558impl Default for MjCommand {
559 fn default() -> Self {
560 Self::new()
561 }
562}
563
564impl MjCommand {
565 pub fn new() -> Self {
566 Self
567 }
568}
569
570impl Command for MjCommand {
571 fn name(&self) -> &str {
572 ".mj"
573 }
574
575 fn description(&self) -> &str {
576 "Render a minijinja template with context from input"
577 }
578
579 fn signature(&self) -> Signature {
580 Signature::build(".mj")
581 .optional("file", SyntaxShape::String, "template file path")
582 .named(
583 "inline",
584 SyntaxShape::String,
585 "inline template string",
586 Some('i'),
587 )
588 .input_output_types(vec![(Type::Record(vec![].into()), Type::String)])
589 .category(Category::Custom("http".into()))
590 }
591
592 fn run(
593 &self,
594 engine_state: &EngineState,
595 stack: &mut Stack,
596 call: &Call,
597 input: PipelineData,
598 ) -> Result<PipelineData, ShellError> {
599 let head = call.head;
600 let file: Option<String> = call.opt(engine_state, stack, 0)?;
601 let inline: Option<String> = call.get_flag(engine_state, stack, "inline")?;
602
603 let template_source = match (&file, &inline) {
605 (Some(_), Some(_)) => {
606 return Err(ShellError::GenericError {
607 error: "Cannot specify both file and --inline".into(),
608 msg: "use either a file path or --inline, not both".into(),
609 span: Some(head),
610 help: None,
611 inner: vec![],
612 });
613 }
614 (None, None) => {
615 return Err(ShellError::GenericError {
616 error: "No template specified".into(),
617 msg: "provide a file path or use --inline".into(),
618 span: Some(head),
619 help: None,
620 inner: vec![],
621 });
622 }
623 (Some(path), None) => {
624 std::fs::read_to_string(path).map_err(|e| ShellError::GenericError {
625 error: format!("Failed to read template file: {e}"),
626 msg: "could not read file".into(),
627 span: Some(head),
628 help: None,
629 inner: vec![],
630 })?
631 }
632 (None, Some(tmpl)) => tmpl.clone(),
633 };
634
635 let context = match input {
637 PipelineData::Value(val, _) => nu_value_to_minijinja(&val),
638 PipelineData::Empty => minijinja::Value::from(()),
639 _ => {
640 return Err(ShellError::TypeMismatch {
641 err_message: "expected record input".into(),
642 span: head,
643 });
644 }
645 };
646
647 let mut env = Environment::new();
649 env.add_template("template", &template_source)
650 .map_err(|e| ShellError::GenericError {
651 error: format!("Template parse error: {e}"),
652 msg: e.to_string(),
653 span: Some(head),
654 help: None,
655 inner: vec![],
656 })?;
657
658 let tmpl = env
659 .get_template("template")
660 .map_err(|e| ShellError::GenericError {
661 error: format!("Failed to get template: {e}"),
662 msg: e.to_string(),
663 span: Some(head),
664 help: None,
665 inner: vec![],
666 })?;
667
668 let rendered = tmpl
669 .render(&context)
670 .map_err(|e| ShellError::GenericError {
671 error: format!("Template render error: {e}"),
672 msg: e.to_string(),
673 span: Some(head),
674 help: None,
675 inner: vec![],
676 })?;
677
678 Ok(Value::string(rendered, head).into_pipeline_data())
679 }
680}
681
682fn nu_value_to_minijinja(val: &Value) -> minijinja::Value {
684 let json = value_to_json(val, &Config::default()).unwrap_or(serde_json::Value::Null);
685 minijinja::Value::from_serialize(&json)
686}