1use crate::commands::RESPONSE_TX;
2use crate::request::{request_to_value, Request};
3use crate::response::{value_to_bytes, Response, ResponseBodyType, ResponseTransport};
4use nu_protocol::{
5 engine::{Job, ThreadJob},
6 PipelineData, Value,
7};
8use std::io::Read;
9use std::sync::{mpsc, Arc};
10use tokio::sync::{mpsc as tokio_mpsc, oneshot};
11
12type BoxError = Box<dyn std::error::Error + Send + Sync>;
13
14pub fn spawn_eval_thread(
15 engine: Arc<crate::Engine>,
16 request: Request,
17 stream: nu_protocol::ByteStream,
18) -> (
19 oneshot::Receiver<Response>,
20 oneshot::Receiver<(Option<String>, ResponseTransport)>,
21) {
22 let (meta_tx, meta_rx) = tokio::sync::oneshot::channel();
23 let (body_tx, body_rx) = tokio::sync::oneshot::channel();
24
25 fn inner(
26 engine: Arc<crate::Engine>,
27 request: Request,
28 stream: nu_protocol::ByteStream,
29 meta_tx: oneshot::Sender<Response>,
30 body_tx: oneshot::Sender<(Option<String>, ResponseTransport)>,
31 ) -> Result<(), BoxError> {
32 RESPONSE_TX.with(|tx| {
33 *tx.borrow_mut() = Some(meta_tx);
34 });
35 let result = engine.eval(
36 request_to_value(&request, nu_protocol::Span::unknown()),
37 stream.into(),
38 );
39 RESPONSE_TX.with(|tx| {
41 let _ = tx.borrow_mut().take(); });
43 let output = result?;
44 let inferred_content_type = match &output {
45 PipelineData::Value(Value::Record { .. }, meta)
46 if meta.as_ref().and_then(|m| m.content_type.clone()).is_none() =>
47 {
48 Some("application/json".to_string())
49 }
50 PipelineData::Value(_, meta) | PipelineData::ListStream(_, meta) => {
51 meta.as_ref().and_then(|m| m.content_type.clone())
52 }
53 _ => None,
54 };
55 match output {
56 PipelineData::Empty => {
57 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
58 Ok(())
59 }
60 PipelineData::Value(Value::Nothing { .. }, _) => {
61 let _ = body_tx.send((inferred_content_type, ResponseTransport::Empty));
62 Ok(())
63 }
64 PipelineData::Value(value, _) => {
65 let _ = body_tx.send((
66 inferred_content_type,
67 ResponseTransport::Full(value_to_bytes(value)),
68 ));
69 Ok(())
70 }
71 PipelineData::ListStream(stream, _) => {
72 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
73 let _ = body_tx.send((inferred_content_type, ResponseTransport::Stream(stream_rx)));
74 for value in stream.into_inner() {
75 if stream_tx.blocking_send(value_to_bytes(value)).is_err() {
76 break;
77 }
78 }
79 Ok(())
80 }
81 PipelineData::ByteStream(stream, meta) => {
82 let (stream_tx, stream_rx) = tokio_mpsc::channel(32);
83 let _ = body_tx.send((
84 meta.as_ref().and_then(|m| m.content_type.clone()),
85 ResponseTransport::Stream(stream_rx),
86 ));
87 let mut reader = stream
88 .reader()
89 .ok_or_else(|| "ByteStream has no reader".to_string())?;
90 let mut buf = vec![0; 8192];
91 loop {
92 match reader.read(&mut buf) {
93 Ok(0) => break, Ok(n) => {
95 if stream_tx.blocking_send(buf[..n].to_vec()).is_err() {
96 break;
97 }
98 }
99 Err(err) => return Err(err.into()),
100 }
101 }
102 Ok(())
103 }
104 }
105 }
106
107 let (sender, _receiver) = mpsc::channel();
109 let signals = engine.state.signals().clone();
110 let job = ThreadJob::new(signals, Some("HTTP Request".to_string()), sender);
111
112 let job_id = {
114 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
115 jobs.add_job(Job::Thread(job.clone()))
116 };
117
118 std::thread::spawn(move || -> Result<(), std::convert::Infallible> {
119 let mut meta_tx_opt = Some(meta_tx);
120 let mut body_tx_opt = Some(body_tx);
121
122 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
125 let mut local_engine = (*engine).clone();
126 local_engine.state.current_job.background_thread_job = Some(job);
127
128 inner(
132 Arc::new(local_engine),
133 request,
134 stream,
135 meta_tx_opt.take().unwrap(),
136 body_tx_opt.take().unwrap(),
137 )
138 }));
139
140 let err_msg: Option<String> = match result {
141 Ok(Ok(())) => None,
142 Ok(Err(e)) => Some(e.to_string()),
143 Err(panic) => Some(format!("panic: {panic:?}")),
144 };
145
146 if let Some(err) = err_msg {
147 eprintln!("Error in eval thread: {err}");
148 if let (Some(meta_tx), Some(body_tx)) = (meta_tx_opt.take(), body_tx_opt.take()) {
149 let _ = meta_tx.send(Response {
150 status: 500,
151 headers: std::collections::HashMap::new(),
152 body_type: ResponseBodyType::Normal,
153 });
154 let _ = body_tx.send((
155 Some("text/plain; charset=utf-8".to_string()),
156 ResponseTransport::Full(format!("Script error: {err}").into_bytes()),
157 ));
158 }
159 }
160
161 {
163 let mut jobs = engine.state.jobs.lock().expect("jobs mutex poisoned");
164 jobs.remove_job(job_id);
165 }
166
167 Ok(())
168 });
169
170 (meta_rx, body_rx)
171}