Skip to main content

http_nu/
worker.rs

1use crate::commands::RESPONSE_TX;
2use crate::logging::log_error;
3use crate::request::{request_to_value, Request};
4use crate::response::{
5    extract_http_response_meta, value_to_bytes, value_to_json, HttpResponseMeta, Response,
6    ResponseTransport,
7};
8use nu_protocol::{
9    engine::{Job, StateWorkingSet, ThreadJob},
10    format_cli_error, PipelineData, Value,
11};
12use std::io::Read;
13use std::sync::{mpsc, Arc};
14use tokio::sync::{mpsc as tokio_mpsc, oneshot};
15
16/// Check if a value is a record without __html field
17fn is_jsonl_record(value: &Value) -> bool {
18    matches!(value, Value::Record { val, .. } if val.get("__html").is_none())
19}
20
21type BoxError = Box<dyn std::error::Error + Send + Sync>;
22
23/// Result of pipeline evaluation containing content-type, HTTP response metadata, and body
24pub type PipelineResult = (Option<String>, HttpResponseMeta, ResponseTransport);
25
26pub fn spawn_eval_thread(
27    engine: Arc<crate::Engine>,
28    request: Request,
29    stream: nu_protocol::ByteStream,
30) -> (
31    oneshot::Receiver<Response>,
32    oneshot::Receiver<PipelineResult>,
33) {
34    let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
35    let (body_tx, body_rx) = tokio::sync::oneshot::channel();
36
37    fn inner(
38        engine: Arc<crate::Engine>,
39        request: Request,
40        stream: nu_protocol::ByteStream,
41        meta_tx: oneshot::Sender<Response>,
42        body_tx: oneshot::Sender<PipelineResult>,
43    ) -> Result<(), BoxError> {
44        RESPONSE_TX.with(|tx| {
45            *tx.borrow_mut() = Some(meta_tx);
46        });
47        let result = engine.run_closure(
48            request_to_value(&request, nu_protocol::Span::unknown()),
49            stream.into(),
50        );
51        // Always clear the thread local storage after eval completes
52        RESPONSE_TX.with(|tx| {
53            let _ = tx.borrow_mut().take(); // This will drop the sender if it wasn't used
54        });
55        let output = result?;
56
57        // Content-type inference (when pipeline metadata has no content-type):
58        //
59        // | Value type       | Content-Type           | Conversion          |
60        // |------------------|------------------------|---------------------|
61        // | Record (__html)  | text/html              | unwrap __html       |
62        // | Record           | application/json       | JSON object         |
63        // | List             | application/json       | JSON array          |
64        // | Binary           | application/octet-stream | raw bytes         |
65        // | Empty/Nothing    | None (no header)       | empty               |
66        // | ListStream       | application/x-ndjson   | JSONL (if records)  |
67        // | Other            | text/html (default)    | .to_string()        |
68        //
69        let inferred_content_type = match &output {
70            PipelineData::Value(Value::Record { val, .. }, meta)
71                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
72            {
73                if val.get("__html").is_some() {
74                    Some("text/html; charset=utf-8".to_string())
75                } else {
76                    Some("application/json".to_string())
77                }
78            }
79            PipelineData::Value(Value::List { .. }, meta)
80                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
81            {
82                Some("application/json".to_string())
83            }
84            PipelineData::Value(Value::Binary { .. }, meta)
85                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
86            {
87                Some("application/octet-stream".to_string())
88            }
89            PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
90                meta.as_ref().and_then(|m| m.content_type.clone())
91            }
92            _ => None,
93        };
94        match output {
95            PipelineData::Empty => {
96                let _ = body_tx.send((
97                    inferred_content_type,
98                    HttpResponseMeta::default(),
99                    ResponseTransport::Empty,
100                ));
101                Ok(())
102            }
103            PipelineData::Value(Value::Nothing { .. }, meta) => {
104                let http_meta = extract_http_response_meta(meta.as_ref());
105                let _ = body_tx.send((inferred_content_type, http_meta, ResponseTransport::Empty));
106                Ok(())
107            }
108            PipelineData::Value(Value::Error { error, .. }, _) => {
109                let working_set = StateWorkingSet::new(&engine.state);
110                Err(format_cli_error(None, &working_set, error.as_ref(), None).into())
111            }
112            PipelineData::Value(value, meta) => {
113                let http_meta = extract_http_response_meta(meta.as_ref());
114                let _ = body_tx.send((
115                    inferred_content_type,
116                    http_meta,
117                    ResponseTransport::Full(value_to_bytes(value)),
118                ));
119                Ok(())
120            }
121            PipelineData::ListStream(stream, meta) => {
122                let http_meta = extract_http_response_meta(meta.as_ref());
123                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
124                let mut iter = stream.into_inner();
125
126                // Peek first value to determine mode
127                let first = iter.next();
128                let use_jsonl = first.as_ref().is_some_and(is_jsonl_record);
129                let content_type = if use_jsonl {
130                    Some("application/x-ndjson".to_string())
131                } else {
132                    inferred_content_type
133                };
134
135                let _ = body_tx.send((
136                    content_type,
137                    http_meta,
138                    ResponseTransport::Stream(stream_rx),
139                ));
140
141                // Helper to send a value
142                let send_value = |stream_tx: &tokio_mpsc::Sender<Vec<u8>>, value: Value| -> bool {
143                    let bytes = if use_jsonl {
144                        let mut line =
145                            serde_json::to_vec(&value_to_json(&value)).unwrap_or_default();
146                        line.push(b'\n');
147                        line
148                    } else {
149                        value_to_bytes(value)
150                    };
151                    stream_tx.blocking_send(bytes).is_ok()
152                };
153
154                // Process first value
155                if let Some(value) = first {
156                    if let Value::Error { error, .. } = &value {
157                        let working_set = StateWorkingSet::new(&engine.state);
158                        log_error(&format_cli_error(None, &working_set, error.as_ref(), None));
159                        return Ok(());
160                    }
161                    if !send_value(&stream_tx, value) {
162                        return Ok(());
163                    }
164                }
165
166                // Process remaining values
167                for value in iter {
168                    if let Value::Error { error, .. } = &value {
169                        let working_set = StateWorkingSet::new(&engine.state);
170                        log_error(&format_cli_error(None, &working_set, error.as_ref(), None));
171                        break;
172                    }
173                    if !send_value(&stream_tx, value) {
174                        break;
175                    }
176                }
177                Ok(())
178            }
179            PipelineData::ByteStream(stream, meta) => {
180                let http_meta = extract_http_response_meta(meta.as_ref());
181                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
182                let content_type = meta
183                    .as_ref()
184                    .and_then(|m| m.content_type.clone())
185                    .or_else(|| Some("application/octet-stream".to_string()));
186                let _ = body_tx.send((
187                    content_type,
188                    http_meta,
189                    ResponseTransport::Stream(stream_rx),
190                ));
191                let mut reader = stream
192                    .reader()
193                    .ok_or_else(|| "ByteStream has no reader".to_string())?;
194                let mut buf = vec![0; 8192];
195                loop {
196                    match reader.read(&mut buf) {
197                        Ok(0) => break, // EOF
198                        Ok(n) => {
199                            if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
200                                break;
201                            }
202                        }
203                        Err(err) => {
204                            // Try to extract ShellError from the io::Error for proper formatting
205                            use nu_protocol::shell_error::bridge::ShellErrorBridge;
206                            if let Some(bridge) = err
207                                .get_ref()
208                                .and_then(|e| e.downcast_ref::<ShellErrorBridge>())
209                            {
210                                let working_set = StateWorkingSet::new(&engine.state);
211                                log_error(&format_cli_error(None, &working_set, &bridge.0, None));
212                                break; // Error already logged, just stop streaming
213                            }
214                            return Err(err.into());
215                        }
216                    }
217                }
218                Ok(())
219            }
220        }
221    }
222
223    // Create a thread job for this evaluation
224    let (sender, _receiver) = mpsc::channel();
225    let signals = engine.state.signals().clone();
226    let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
227
228    // Add the job to the engine's job table
229    let job_id = {
230        let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
231        jobs.add_job(Job::Thread(job.clone()))
232    };
233
234    std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
235        let mut meta_tx_opt = Some(meta_tx);
236        let mut body_tx_opt = Some(body_tx);
237
238        // Wrap the evaluation in catch_unwind so that panics don't poison the
239        // async runtime and we can still send a response back to the caller.
240        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
241            let mut local_engine = (*engine).clone();
242            local_engine.state.current_job.background_thread_job = Some(job);
243
244            // Take the senders for the inner call. If the evaluation completes
245            // successfully, these senders will have been consumed. Otherwise we
246            // will use the remaining ones to send an error response.
247            inner(
248                Arc::new(local_engine),
249                request,
250                stream,
251                meta_tx_opt.take().unwrap(),
252                body_tx_opt.take().unwrap(),
253            )
254        }));
255
256        let err_msg: Option<String> = match result {
257            Ok(Ok(())) => None,
258            Ok(Err(e)) => Some(e.to_string()),
259            Err(panic) => Some(format!("panic: {panic:?}")),
260        };
261
262        if let Some(err) = err_msg {
263            log_error(&err);
264            // Drop meta_tx - we don't use it for normal responses anymore
265            // (only .static and .reverse-proxy use it)
266            drop(meta_tx_opt.take());
267            if let Some(body_tx) = body_tx_opt.take() {
268                let error_meta = HttpResponseMeta {
269                    status: Some(500),
270                    headers: std::collections::HashMap::new(),
271                };
272                let _ = body_tx.send((
273                    Some("text/plain; charset=utf-8".to_string()),
274                    error_meta,
275                    ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
276                ));
277            }
278        }
279
280        // Clean up job when done
281        {
282            let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
283            jobs.remove_job(job_id);
284        }
285
286        Ok(())
287    });
288
289    (meta_rx, body_rx)
290}