1use crate::commands::RESPONSE_TX;
2use crate::logging::log_error;
3use crate::request::{request_to_value, Request};
4use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
5use nu_protocol::{
6 engine::{Job, StateWorkingSet, ThreadJob},
7 format_cli_error, PipelineData, Value,
8};
9use std::io::Read;
10use std::sync::{mpsc, Arc};
11use tokio::sync::{mpsc as tokio_mpsc, oneshot};
12
13type BoxError = Box<dyn std::error::Error + Send + Sync>;
14
15pub fn spawn_eval_thread(
16 engine: Arc<crate::Engine>,
17 request: Request,
18 stream: nu_protocol::ByteStream,
19) -> (
20 oneshot::Receiver<Response>,
21 oneshot::Receiver<(Option<String>, ResponseTransport)>,
22) {
23 let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
24 let (body_tx, body_rx) = tokio::sync::oneshot::channel();
25
26 fn inner(
27 engine: Arc<crate::Engine>,
28 request: Request,
29 stream: nu_protocol::ByteStream,
30 meta_tx: oneshot::Sender<Response>,
31 body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
32 ) -> Result<(), BoxError> {
33 RESPONSE_TX.with(|tx| {
34 *tx.borrow_mut() = Some(meta_tx);
35 });
36 let result = engine.run_closure(
37 request_to_value(&request, nu_protocol::Span::unknown()),
38 stream.into(),
39 );
40 RESPONSE_TX.with(|tx| {
42 let _ = tx.borrow_mut().take(); });
44 let output = result?;
45 let inferred_content_type = match &output {
46 PipelineData::Value(Value::Record { val, .. }, meta)
47 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
48 {
49 if val.get("__html").is_some() {
50 Some("text/html; charset=utf-8".to_string())
51 } else {
52 Some("application/json".to_string())
53 }
54 }
55 PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
56 meta.as_ref().and_then(|m| m.content_type.clone())
57 }
58 _ => None,
59 };
60 match output {
61 PipelineData::Empty => {
62 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
63 Ok(())
64 }
65 PipelineData::Value(Value::Nothing { .. }, _) => {
66 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
67 Ok(())
68 }
69 PipelineData::Value(Value::Error { error, .. }, _) => {
70 let working_set = StateWorkingSet::new(&engine.state);
71 Err(format_cli_error(&working_set, error.as_ref(), None).into())
72 }
73 PipelineData::Value(value, _) => {
74 let _ = body_tx.send((
75 inferred_content_type,
76 ResponseTransport::Full(value_to_bytes(value)),
77 ));
78 Ok(())
79 }
80 PipelineData::ListStream(stream, _) => {
81 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
82 let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
83 for value in stream.into_inner() {
84 if let Value::Error { error, .. } = &value {
86 let working_set = StateWorkingSet::new(&engine.state);
87 log_error(&format_cli_error(&working_set, error.as_ref(), None));
88 break;
89 }
90 if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
91 break;
92 }
93 }
94 Ok(())
95 }
96 PipelineData::ByteStream(stream, meta) => {
97 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
98 let _ = body_tx.send((
99 meta.as_ref().and_then(|m| m.content_type.clone()),
100 ResponseTransport::Stream(stream_rx),
101 ));
102 let mut reader = stream
103 .reader()
104 .ok_or_else(|| "ByteStream has no reader".to_string())?;
105 let mut buf = vec![0; 8192];
106 loop {
107 match reader.read(&mut buf) {
108 Ok(0) => break, Ok(n) => {
110 if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
111 break;
112 }
113 }
114 Err(err) => {
115 use nu_protocol::shell_error::bridge::ShellErrorBridge;
117 if let Some(bridge) = err
118 .get_ref()
119 .and_then(|e| e.downcast_ref::<ShellErrorBridge>())
120 {
121 let working_set = StateWorkingSet::new(&engine.state);
122 log_error(&format_cli_error(&working_set, &bridge.0, None));
123 break; }
125 return Err(err.into());
126 }
127 }
128 }
129 Ok(())
130 }
131 }
132 }
133
134 let (sender, _receiver) = mpsc::channel();
136 let signals = engine.state.signals().clone();
137 let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
138
139 let job_id = {
141 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
142 jobs.add_job(Job::Thread(job.clone()))
143 };
144
145 std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
146 let mut meta_tx_opt = Some(meta_tx);
147 let mut body_tx_opt = Some(body_tx);
148
149 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
152 let mut local_engine = (*engine).clone();
153 local_engine.state.current_job.background_thread_job = Some(job);
154
155 inner(
159 Arc::new(local_engine),
160 request,
161 stream,
162 meta_tx_opt.take().unwrap(),
163 body_tx_opt.take().unwrap(),
164 )
165 }));
166
167 let err_msg: Option<String> = match result {
168 Ok(Ok(())) => None,
169 Ok(Err(e)) => Some(e.to_string()),
170 Err(panic) => Some(format!("panic: {panic:?}")),
171 };
172
173 if let Some(err) = err_msg {
174 log_error(&err);
175 if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
176 let _ = meta_tx.send(Response {
177 status: 500,
178 headers: std::collections::HashMap::new(),
179 body_type: ResponseBodyType::Normal,
180 });
181 let _ = body_tx.send((
182 Some("text/plain; charset=utf-8".to_string()),
183 ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
184 ));
185 }
186 }
187
188 {
190 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
191 jobs.remove_job(job_id);
192 }
193
194 Ok(())
195 });
196
197 (meta_rx, body_rx)
198}