http_nu/
worker.rs

1use crate::commands::RESPONSE_TX;
2use crate::request::{request_to_value, Request};
3use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
4use nu_protocol::{
5    engine::{Job, ThreadJob},
6    PipelineData, Value,
7};
8use std::io::Read;
9use std::sync::{mpsc, Arc};
10use tokio::sync::{mpsc as tokio_mpsc, oneshot};
11
12type BoxError = Box<dyn std::error::Error + Send + Sync>;
13
14pub fn spawn_eval_thread(
15    engine: Arc<crate::Engine>,
16    request: Request,
17    stream: nu_protocol::ByteStream,
18) -> (
19    oneshot::Receiver<Response>,
20    oneshot::Receiver<(Option<String>, ResponseTransport)>,
21) {
22    let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
23    let (body_tx, body_rx) = tokio::sync::oneshot::channel();
24
25    fn inner(
26        engine: Arc<crate::Engine>,
27        request: Request,
28        stream: nu_protocol::ByteStream,
29        meta_tx: oneshot::Sender<Response>,
30        body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
31    ) -> Result<(), BoxError> {
32        RESPONSE_TX.with(|tx| {
33            *tx.borrow_mut() = Some(meta_tx);
34        });
35        let result = engine.run_closure(
36            request_to_value(&request, nu_protocol::Span::unknown()),
37            stream.into(),
38        );
39        // Always clear the thread local storage after eval completes
40        RESPONSE_TX.with(|tx| {
41            let _ = tx.borrow_mut().take(); // This will drop the sender if it wasn't used
42        });
43        let output = result?;
44        let inferred_content_type = match &output {
45            PipelineData::Value(Value::Record { val, .. }, meta)
46                if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
47            {
48                if val.get("__html").is_some() {
49                    Some("text/html; charset=utf-8".to_string())
50                } else {
51                    Some("application/json".to_string())
52                }
53            }
54            PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
55                meta.as_ref().and_then(|m| m.content_type.clone())
56            }
57            _ => None,
58        };
59        match output {
60            PipelineData::Empty => {
61                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
62                Ok(())
63            }
64            PipelineData::Value(Value::Nothing { .. }, _) => {
65                let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
66                Ok(())
67            }
68            PipelineData::Value(value, _) => {
69                let _ = body_tx.send((
70                    inferred_content_type,
71                    ResponseTransport::Full(value_to_bytes(value)),
72                ));
73                Ok(())
74            }
75            PipelineData::ListStream(stream, _) => {
76                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
77                let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
78                for value in stream.into_inner() {
79                    if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
80                        break;
81                    }
82                }
83                Ok(())
84            }
85            PipelineData::ByteStream(stream, meta) => {
86                let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
87                let _ = body_tx.send((
88                    meta.as_ref().and_then(|m| m.content_type.clone()),
89                    ResponseTransport::Stream(stream_rx),
90                ));
91                let mut reader = stream
92                    .reader()
93                    .ok_or_else(|| "ByteStream has no reader".to_string())?;
94                let mut buf = vec![0; 8192];
95                loop {
96                    match reader.read(&mut buf) {
97                        Ok(0) => break, // EOF
98                        Ok(n) => {
99                            if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
100                                break;
101                            }
102                        }
103                        Err(err) => return Err(err.into()),
104                    }
105                }
106                Ok(())
107            }
108        }
109    }
110
111    // Create a thread job for this evaluation
112    let (sender, _receiver) = mpsc::channel();
113    let signals = engine.state.signals().clone();
114    let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
115
116    // Add the job to the engine's job table
117    let job_id = {
118        let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
119        jobs.add_job(Job::Thread(job.clone()))
120    };
121
122    std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
123        let mut meta_tx_opt = Some(meta_tx);
124        let mut body_tx_opt = Some(body_tx);
125
126        // Wrap the evaluation in catch_unwind so that panics don't poison the
127        // async runtime and we can still send a response back to the caller.
128        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
129            let mut local_engine = (*engine).clone();
130            local_engine.state.current_job.background_thread_job = Some(job);
131
132            // Take the senders for the inner call. If the evaluation completes
133            // successfully, these senders will have been consumed. Otherwise we
134            // will use the remaining ones to send an error response.
135            inner(
136                Arc::new(local_engine),
137                request,
138                stream,
139                meta_tx_opt.take().unwrap(),
140                body_tx_opt.take().unwrap(),
141            )
142        }));
143
144        let err_msg: Option<String> = match result {
145            Ok(Ok(())) => None,
146            Ok(Err(e)) => Some(e.to_string()),
147            Err(panic) => Some(format!("panic: {panic:?}")),
148        };
149
150        if let Some(err) = err_msg {
151            eprintln!("Error in eval thread: {err}");
152            if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
153                let _ = meta_tx.send(Response {
154                    status: 500,
155                    headers: std::collections::HashMap::new(),
156                    body_type: ResponseBodyType::Normal,
157                });
158                let _ = body_tx.send((
159                    Some("text/plain; charset=utf-8".to_string()),
160                    ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
161                ));
162            }
163        }
164
165        // Clean up job when done
166        {
167            let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
168            jobs.remove_job(job_id);
169        }
170
171        Ok(())
172    });
173
174    (meta_rx, body_rx)
175}