http_nu/
worker.rs

1use crate::commands::RESPONSE_TX;
2use crate::request::{request_to_value, Request};
3use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
4use nu_protocol::{
5    engine::{Job, ThreadJob},
6    PipelineData, Value,
7};
8use std::io::Read;
9use std::sync::{mpsc, Arc};
10use tokio::sync::{mpsc as tokio_mpsc, oneshot};
11
12type BoxError = Box<dyn std::error::Error + Send + Sync>;
13
14pub fn spawn_eval_thread(
15    engine: Arc<crate::Engine>,
16    request: Request,
17    stream: nu_protocol::ByteStream,
18) -> (
19    oneshot::Receiver<Response>,
20    oneshot::Receiver<(Option<String>, ResponseTransport)>,
21) {
22    let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
23    let (body_tx, body_rx) = tokio::sync::oneshot::channel();
24
25    fn inner(
26        engine: Arc<crate::Engine>,
27        request: Request,
28        stream: nu_protocol::ByteStream,
29        meta_tx: oneshot::Sender<Response>,
30        body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
31    ) -> Result<(), BoxError> {
32        RESPONSE_TX.with(|tx| {
33            *tx.borrow_mut() = Some(meta_tx);
34        });
35        let result = engine.eval(
36            request_to_value(&request, nu_protocol::Span::unknown()),
37            stream.into(),
38        );
39        // Always clear the thread local storage after eval completes
40        RESPONSE_TX.with(|tx| {
41            let _ = tx.borrow_mut().take(); // This will drop the sender if it wasn't used
42        });
43        let output = result?;
44        let inferred_content_type = match &output {
45            PipelineData::Value(Value::Record { .. }, meta)
46                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
47            {
48                Some("application/json".to_string())
49            }
50            PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
51                meta.as_ref().and_then(|m| m.content_type.clone())
52            }
53            _ => None,
54        };
55        match output {
56            PipelineData::Empty => {
57                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
58                Ok(())
59            }
60            PipelineData::Value(Value::Nothing { .. }, _) => {
61                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
62                Ok(())
63            }
64            PipelineData::Value(value, _) => {
65                let _ = body_tx.send((
66                    inferred_content_type,
67                    ResponseTransport::Full(value_to_bytes(value)),
68                ));
69                Ok(())
70            }
71            PipelineData::ListStream(stream, _) => {
72                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
73                let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
74                for value in stream.into_inner() {
75                    if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
76                        break;
77                    }
78                }
79                Ok(())
80            }
81            PipelineData::ByteStream(stream, meta) => {
82                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
83                let _ = body_tx.send((
84                    meta.as_ref().and_then(|m| m.content_type.clone()),
85                    ResponseTransport::Stream(stream_rx),
86                ));
87                let mut reader = stream
88                    .reader()
89                    .ok_or_else(|| "ByteStream has no reader".to_string())?;
90                let mut buf = vec![0; 8192];
91                loop {
92                    match reader.read(&mut buf) {
93                        Ok(0) => break, // EOF
94                        Ok(n) => {
95                            if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
96                                break;
97                            }
98                        }
99                        Err(err) => return Err(err.into()),
100                    }
101                }
102                Ok(())
103            }
104        }
105    }
106
107    // Create a thread job for this evaluation
108    let (sender, _receiver) = mpsc::channel();
109    let signals = engine.state.signals().clone();
110    let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
111
112    // Add the job to the engine's job table
113    let job_id = {
114        let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
115        jobs.add_job(Job::Thread(job.clone()))
116    };
117
118    std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
119        let mut meta_tx_opt = Some(meta_tx);
120        let mut body_tx_opt = Some(body_tx);
121
122        // Wrap the evaluation in catch_unwind so that panics don't poison the
123        // async runtime and we can still send a response back to the caller.
124        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
125            let mut local_engine = (*engine).clone();
126            local_engine.state.current_job.background_thread_job = Some(job);
127
128            // Take the senders for the inner call. If the evaluation completes
129            // successfully, these senders will have been consumed. Otherwise we
130            // will use the remaining ones to send an error response.
131            inner(
132                Arc::new(local_engine),
133                request,
134                stream,
135                meta_tx_opt.take().unwrap(),
136                body_tx_opt.take().unwrap(),
137            )
138        }));
139
140        let err_msg: Option<String> = match result {
141            Ok(Ok(())) => None,
142            Ok(Err(e)) => Some(e.to_string()),
143            Err(panic) => Some(format!("panic: {panic:?}")),
144        };
145
146        if let Some(err) = err_msg {
147            eprintln!("Error in eval thread: {err}");
148            if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
149                let _ = meta_tx.send(Response {
150                    status: 500,
151                    headers: std::collections::HashMap::new(),
152                    body_type: ResponseBodyType::Normal,
153                });
154                let _ = body_tx.send((
155                    Some("text/plain; charset=utf-8".to_string()),
156                    ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
157                ));
158            }
159        }
160
161        // Clean up job when done
162        {
163            let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
164            jobs.remove_job(job_id);
165        }
166
167        Ok(())
168    });
169
170    (meta_rx, body_rx)
171}