1use crate::commands::RESPONSE_TX;
2use crate::logging::log_error;
3use crate::request::{request_to_value, Request};
4use crate::response::{
5 extract_http_response_meta, value_to_bytes, value_to_json, HttpResponseMeta, Response,
6 ResponseTransport,
7};
8use nu_protocol::{
9 engine::{Job, StateWorkingSet, ThreadJob},
10 format_cli_error, PipelineData, Value,
11};
12use std::io::Read;
13use std::sync::{mpsc, Arc};
14use tokio::sync::{mpsc as tokio_mpsc, oneshot};
15
16fn is_jsonl_record(value: &Value) -> bool {
18 matches!(value, Value::Record { val, .. } if val.get("__html").is_none())
19}
20
21type BoxError = Box<dyn std::error::Error + Send + Sync>;
22
23pub type PipelineResult = (Option<String>, HttpResponseMeta, ResponseTransport);
25
26pub fn spawn_eval_thread(
27 engine: Arc<crate::Engine>,
28 request: Request,
29 stream: nu_protocol::ByteStream,
30) -> (
31 oneshot::Receiver<Response>,
32 oneshot::Receiver<PipelineResult>,
33) {
34 let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
35 let (body_tx, body_rx) = tokio::sync::oneshot::channel();
36
37 fn inner(
38 engine: Arc<crate::Engine>,
39 request: Request,
40 stream: nu_protocol::ByteStream,
41 meta_tx: oneshot::Sender<Response>,
42 body_tx: oneshot::Sender<PipelineResult>,
43 ) -> Result<(), BoxError> {
44 RESPONSE_TX.with(|tx| {
45 *tx.borrow_mut() = Some(meta_tx);
46 });
47 let result = engine.run_closure(
48 request_to_value(&request, nu_protocol::Span::unknown()),
49 stream.into(),
50 );
51 RESPONSE_TX.with(|tx| {
53 let _ = tx.borrow_mut().take(); });
55 let output = result?;
56
57 let inferred_content_type = match &output {
70 PipelineData::Value(Value::Record { val, .. }, meta)
71 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
72 {
73 if val.get("__html").is_some() {
74 Some("text/html; charset=utf-8".to_string())
75 } else {
76 Some("application/json".to_string())
77 }
78 }
79 PipelineData::Value(Value::List { .. }, meta)
80 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
81 {
82 Some("application/json".to_string())
83 }
84 PipelineData::Value(Value::Binary { .. }, meta)
85 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
86 {
87 Some("application/octet-stream".to_string())
88 }
89 PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
90 meta.as_ref().and_then(|m| m.content_type.clone())
91 }
92 _ => None,
93 };
94 match output {
95 PipelineData::Empty => {
96 let _ = body_tx.send((
97 inferred_content_type,
98 HttpResponseMeta::default(),
99 ResponseTransport::Empty,
100 ));
101 Ok(())
102 }
103 PipelineData::Value(Value::Nothing { .. }, meta) => {
104 let http_meta = extract_http_response_meta(meta.as_ref());
105 let _ = body_tx.send((inferred_content_type, http_meta, ResponseTransport::Empty));
106 Ok(())
107 }
108 PipelineData::Value(Value::Error { error, .. }, _) => {
109 let working_set = StateWorkingSet::new(&engine.state);
110 Err(format_cli_error(None, &working_set, error.as_ref(), None).into())
111 }
112 PipelineData::Value(value, meta) => {
113 let http_meta = extract_http_response_meta(meta.as_ref());
114 let _ = body_tx.send((
115 inferred_content_type,
116 http_meta,
117 ResponseTransport::Full(value_to_bytes(value)),
118 ));
119 Ok(())
120 }
121 PipelineData::ListStream(stream, meta) => {
122 let http_meta = extract_http_response_meta(meta.as_ref());
123 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
124 let mut iter = stream.into_inner();
125
126 let first = iter.next();
128 let use_jsonl = first.as_ref().is_some_and(is_jsonl_record);
129 let content_type = if use_jsonl {
130 Some("application/x-ndjson".to_string())
131 } else {
132 inferred_content_type
133 };
134
135 let _ = body_tx.send((
136 content_type,
137 http_meta,
138 ResponseTransport::Stream(stream_rx),
139 ));
140
141 let send_value = |stream_tx: &tokio_mpsc::Sender<Vec<u8>>, value: Value| -> bool {
143 let bytes = if use_jsonl {
144 let mut line =
145 serde_json::to_vec(&value_to_json(&value)).unwrap_or_default();
146 line.push(b'\n');
147 line
148 } else {
149 value_to_bytes(value)
150 };
151 stream_tx.blocking_send(bytes).is_ok()
152 };
153
154 if let Some(value) = first {
156 if let Value::Error { error, .. } = &value {
157 let working_set = StateWorkingSet::new(&engine.state);
158 log_error(&format_cli_error(None, &working_set, error.as_ref(), None));
159 return Ok(());
160 }
161 if !send_value(&stream_tx, value) {
162 return Ok(());
163 }
164 }
165
166 for value in iter {
168 if let Value::Error { error, .. } = &value {
169 let working_set = StateWorkingSet::new(&engine.state);
170 log_error(&format_cli_error(None, &working_set, error.as_ref(), None));
171 break;
172 }
173 if !send_value(&stream_tx, value) {
174 break;
175 }
176 }
177 Ok(())
178 }
179 PipelineData::ByteStream(stream, meta) => {
180 let http_meta = extract_http_response_meta(meta.as_ref());
181 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
182 let content_type = meta
183 .as_ref()
184 .and_then(|m| m.content_type.clone())
185 .or_else(|| Some("application/octet-stream".to_string()));
186 let _ = body_tx.send((
187 content_type,
188 http_meta,
189 ResponseTransport::Stream(stream_rx),
190 ));
191 let mut reader = stream
192 .reader()
193 .ok_or_else(|| "ByteStream has no reader".to_string())?;
194 let mut buf = vec![0; 8192];
195 loop {
196 match reader.read(&mut buf) {
197 Ok(0) => break, Ok(n) => {
199 if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
200 break;
201 }
202 }
203 Err(err) => {
204 use nu_protocol::shell_error::bridge::ShellErrorBridge;
206 if let Some(bridge) = err
207 .get_ref()
208 .and_then(|e| e.downcast_ref::<ShellErrorBridge>())
209 {
210 let working_set = StateWorkingSet::new(&engine.state);
211 log_error(&format_cli_error(None, &working_set, &bridge.0, None));
212 break; }
214 return Err(err.into());
215 }
216 }
217 }
218 Ok(())
219 }
220 }
221 }
222
223 let (sender, _receiver) = mpsc::channel();
225 let signals = engine.state.signals().clone();
226 let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
227
228 let job_id = {
230 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
231 jobs.add_job(Job::Thread(job.clone()))
232 };
233
234 std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
235 let mut meta_tx_opt = Some(meta_tx);
236 let mut body_tx_opt = Some(body_tx);
237
238 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
241 let mut local_engine = (*engine).clone();
242 local_engine.state.current_job.background_thread_job = Some(job);
243
244 inner(
248 Arc::new(local_engine),
249 request,
250 stream,
251 meta_tx_opt.take().unwrap(),
252 body_tx_opt.take().unwrap(),
253 )
254 }));
255
256 let err_msg: Option<String> = match result {
257 Ok(Ok(())) => None,
258 Ok(Err(e)) => Some(e.to_string()),
259 Err(panic) => Some(format!("panic: {panic:?}")),
260 };
261
262 if let Some(err) = err_msg {
263 log_error(&err);
264 drop(meta_tx_opt.take());
267 if let Some(body_tx) = body_tx_opt.take() {
268 let error_meta = HttpResponseMeta {
269 status: Some(500),
270 headers: std::collections::HashMap::new(),
271 };
272 let _ = body_tx.send((
273 Some("text/plain; charset=utf-8".to_string()),
274 error_meta,
275 ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
276 ));
277 }
278 }
279
280 {
282 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
283 jobs.remove_job(job_id);
284 }
285
286 Ok(())
287 });
288
289 (meta_rx, body_rx)
290}