http_nu/
worker.rs

1use crate::commands::RESPONSE_TX;
2use crate::logging::log_error;
3use crate::request::{request_to_value, Request};
4use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
5use nu_protocol::{
6    engine::{Job, StateWorkingSet, ThreadJob},
7    format_cli_error, PipelineData, Value,
8};
9use std::io::Read;
10use std::sync::{mpsc, Arc};
11use tokio::sync::{mpsc as tokio_mpsc, oneshot};
12
13type BoxError = Box<dyn std::error::Error + Send + Sync>;
14
15pub fn spawn_eval_thread(
16    engine: Arc<crate::Engine>,
17    request: Request,
18    stream: nu_protocol::ByteStream,
19) -> (
20    oneshot::Receiver<Response>,
21    oneshot::Receiver<(Option<String>, ResponseTransport)>,
22) {
23    let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
24    let (body_tx, body_rx) = tokio::sync::oneshot::channel();
25
26    fn inner(
27        engine: Arc<crate::Engine>,
28        request: Request,
29        stream: nu_protocol::ByteStream,
30        meta_tx: oneshot::Sender<Response>,
31        body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
32    ) -> Result<(), BoxError> {
33        RESPONSE_TX.with(|tx| {
34            *tx.borrow_mut() = Some(meta_tx);
35        });
36        let result = engine.run_closure(
37            request_to_value(&request, nu_protocol::Span::unknown()),
38            stream.into(),
39        );
40        // Always clear the thread local storage after eval completes
41        RESPONSE_TX.with(|tx| {
42            let _ = tx.borrow_mut().take(); // This will drop the sender if it wasn't used
43        });
44        let output = result?;
45        let inferred_content_type = match &output {
46            PipelineData::Value(Value::Record { val, .. }, meta)
47                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
48            {
49                if val.get("__html").is_some() {
50                    Some("text/html; charset=utf-8".to_string())
51                } else {
52                    Some("application/json".to_string())
53                }
54            }
55            PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
56                meta.as_ref().and_then(|m| m.content_type.clone())
57            }
58            _ => None,
59        };
60        match output {
61            PipelineData::Empty => {
62                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
63                Ok(())
64            }
65            PipelineData::Value(Value::Nothing { .. }, _) => {
66                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
67                Ok(())
68            }
69            PipelineData::Value(Value::Error { error, .. }, _) => {
70                let working_set = StateWorkingSet::new(&engine.state);
71                Err(format_cli_error(&working_set, error.as_ref(), None).into())
72            }
73            PipelineData::Value(value, _) => {
74                let _ = body_tx.send((
75                    inferred_content_type,
76                    ResponseTransport::Full(value_to_bytes(value)),
77                ));
78                Ok(())
79            }
80            PipelineData::ListStream(stream, _) => {
81                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
82                let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
83                for value in stream.into_inner() {
84                    // Check for errors in the stream and log them properly
85                    if let Value::Error { error, .. } = &value {
86                        let working_set = StateWorkingSet::new(&engine.state);
87                        log_error(&format_cli_error(&working_set, error.as_ref(), None));
88                        break;
89                    }
90                    if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
91                        break;
92                    }
93                }
94                Ok(())
95            }
96            PipelineData::ByteStream(stream, meta) => {
97                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
98                let _ = body_tx.send((
99                    meta.as_ref().and_then(|m| m.content_type.clone()),
100                    ResponseTransport::Stream(stream_rx),
101                ));
102                let mut reader = stream
103                    .reader()
104                    .ok_or_else(|| "ByteStream has no reader".to_string())?;
105                let mut buf = vec![0; 8192];
106                loop {
107                    match reader.read(&mut buf) {
108                        Ok(0) => break, // EOF
109                        Ok(n) => {
110                            if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
111                                break;
112                            }
113                        }
114                        Err(err) => {
115                            // Try to extract ShellError from the io::Error for proper formatting
116                            use nu_protocol::shell_error::bridge::ShellErrorBridge;
117                            if let Some(bridge) = err
118                                .get_ref()
119                                .and_then(|e| e.downcast_ref::<ShellErrorBridge>())
120                            {
121                                let working_set = StateWorkingSet::new(&engine.state);
122                                log_error(&format_cli_error(&working_set, &bridge.0, None));
123                                break; // Error already logged, just stop streaming
124                            }
125                            return Err(err.into());
126                        }
127                    }
128                }
129                Ok(())
130            }
131        }
132    }
133
134    // Create a thread job for this evaluation
135    let (sender, _receiver) = mpsc::channel();
136    let signals = engine.state.signals().clone();
137    let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
138
139    // Add the job to the engine's job table
140    let job_id = {
141        let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
142        jobs.add_job(Job::Thread(job.clone()))
143    };
144
145    std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
146        let mut meta_tx_opt = Some(meta_tx);
147        let mut body_tx_opt = Some(body_tx);
148
149        // Wrap the evaluation in catch_unwind so that panics don't poison the
150        // async runtime and we can still send a response back to the caller.
151        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
152            let mut local_engine = (*engine).clone();
153            local_engine.state.current_job.background_thread_job = Some(job);
154
155            // Take the senders for the inner call. If the evaluation completes
156            // successfully, these senders will have been consumed. Otherwise we
157            // will use the remaining ones to send an error response.
158            inner(
159                Arc::new(local_engine),
160                request,
161                stream,
162                meta_tx_opt.take().unwrap(),
163                body_tx_opt.take().unwrap(),
164            )
165        }));
166
167        let err_msg: Option<String> = match result {
168            Ok(Ok(())) => None,
169            Ok(Err(e)) => Some(e.to_string()),
170            Err(panic) => Some(format!("panic: {panic:?}")),
171        };
172
173        if let Some(err) = err_msg {
174            log_error(&err);
175            if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
176                let _ = meta_tx.send(Response {
177                    status: 500,
178                    headers: std::collections::HashMap::new(),
179                    body_type: ResponseBodyType::Normal,
180                });
181                let _ = body_tx.send((
182                    Some("text/plain; charset=utf-8".to_string()),
183                    ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
184                ));
185            }
186        }
187
188        // Clean up job when done
189        {
190            let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
191            jobs.remove_job(job_id);
192        }
193
194        Ok(())
195    });
196
197    (meta_rx, body_rx)
198}