1use crate::commands::RESPONSE_TX;
2use crate::request::{request_to_value, Request};
3use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
4use nu_protocol::{
5 engine::{Job, ThreadJob},
6 PipelineData, Value,
7};
8use std::io::Read;
9use std::sync::{mpsc, Arc};
10use tokio::sync::{mpsc as tokio_mpsc, oneshot};
11
12type BoxError = Box<dyn std::error::Error + Send + Sync>;
13
14pub fn spawn_eval_thread(
15 engine: Arc<crate::Engine>,
16 request: Request,
17 stream: nu_protocol::ByteStream,
18) -> (
19 oneshot::Receiver<Response>,
20 oneshot::Receiver<(Option<String>, ResponseTransport)>,
21) {
22 let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
23 let (body_tx, body_rx) = tokio::sync::oneshot::channel();
24
25 fn inner(
26 engine: Arc<crate::Engine>,
27 request: Request,
28 stream: nu_protocol::ByteStream,
29 meta_tx: oneshot::Sender<Response>,
30 body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
31 ) -> Result<(), BoxError> {
32 RESPONSE_TX.with(|tx| {
33 *tx.borrow_mut() = Some(meta_tx);
34 });
35 let result = engine.run_closure(
36 request_to_value(&request, nu_protocol::Span::unknown()),
37 stream.into(),
38 );
39 RESPONSE_TX.with(|tx| {
41 let _ = tx.borrow_mut().take(); });
43 let output = result?;
44 let inferred_content_type = match &output {
45 PipelineData::Value(Value::Record { val, .. }, meta)
46 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
47 {
48 if val.get("__html").is_some() {
49 Some("text/html; charset=utf-8".to_string())
50 } else {
51 Some("application/json".to_string())
52 }
53 }
54 PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
55 meta.as_ref().and_then(|m| m.content_type.clone())
56 }
57 _ => None,
58 };
59 match output {
60 PipelineData::Empty => {
61 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
62 Ok(())
63 }
64 PipelineData::Value(Value::Nothing { .. }, _) => {
65 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
66 Ok(())
67 }
68 PipelineData::Value(value, _) => {
69 let _ = body_tx.send((
70 inferred_content_type,
71 ResponseTransport::Full(value_to_bytes(value)),
72 ));
73 Ok(())
74 }
75 PipelineData::ListStream(stream, _) => {
76 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
77 let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
78 for value in stream.into_inner() {
79 if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
80 break;
81 }
82 }
83 Ok(())
84 }
85 PipelineData::ByteStream(stream, meta) => {
86 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
87 let _ = body_tx.send((
88 meta.as_ref().and_then(|m| m.content_type.clone()),
89 ResponseTransport::Stream(stream_rx),
90 ));
91 let mut reader = stream
92 .reader()
93 .ok_or_else(|| "ByteStream has no reader".to_string())?;
94 let mut buf = vec![0; 8192];
95 loop {
96 match reader.read(&mut buf) {
97 Ok(0) => break, Ok(n) => {
99 if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
100 break;
101 }
102 }
103 Err(err) => return Err(err.into()),
104 }
105 }
106 Ok(())
107 }
108 }
109 }
110
111 let (sender, _receiver) = mpsc::channel();
113 let signals = engine.state.signals().clone();
114 let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
115
116 let job_id = {
118 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
119 jobs.add_job(Job::Thread(job.clone()))
120 };
121
122 std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
123 let mut meta_tx_opt = Some(meta_tx);
124 let mut body_tx_opt = Some(body_tx);
125
126 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
129 let mut local_engine = (*engine).clone();
130 local_engine.state.current_job.background_thread_job = Some(job);
131
132 inner(
136 Arc::new(local_engine),
137 request,
138 stream,
139 meta_tx_opt.take().unwrap(),
140 body_tx_opt.take().unwrap(),
141 )
142 }));
143
144 let err_msg: Option<String> = match result {
145 Ok(Ok(())) => None,
146 Ok(Err(e)) => Some(e.to_string()),
147 Err(panic) => Some(format!("panic: {panic:?}")),
148 };
149
150 if let Some(err) = err_msg {
151 eprintln!("Error in eval thread: {err}");
152 if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
153 let _ = meta_tx.send(Response {
154 status: 500,
155 headers: std::collections::HashMap::new(),
156 body_type: ResponseBodyType::Normal,
157 });
158 let _ = body_tx.send((
159 Some("text/plain; charset=utf-8".to_string()),
160 ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
161 ));
162 }
163 }
164
165 {
167 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
168 jobs.remove_job(job_id);
169 }
170
171 Ok(())
172 });
173
174 (meta_rx, body_rx)
175}