1use std::collections::HashMap;
2use std::env::{self, vars};
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
6use std::sync::{
7 atomic::{AtomicU64, Ordering},
8 Arc, Mutex,
9};
10use std::time::Duration;
11use std::{fs, io, thread};
12
13use anyhow::{anyhow, Context, Result};
14use crossbeam_channel::{after, bounded, never, select, Receiver, Sender};
15use fnv::FnvHashMap;
16use serde_json::{from_str, to_string};
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use super::{ready_msg, Request, Response};
21use crate::core::{Processor, Task};
22
23pub fn dump_error_resp_env(pid: u32) -> String {
25 format!("DUMP_ERR_RESP_{}", pid)
26}
27
28fn start_response_handler<T: Task>(child_pid: u32, stdout: ChildStdout, in_flight_requests: InflightRequests<T::Output>) -> Result<()> {
29 let mut reader = BufReader::new(stdout);
30 let mut line_buf = String::new();
31
32 loop {
33 line_buf.clear();
34
35 let size = reader.read_line(&mut line_buf).context("read line from stdout")?;
36 if size == 0 {
37 error!("child exited");
38 return Err(io::Error::new(io::ErrorKind::BrokenPipe, "child process exit").into());
39 }
40
41 let resp: Response<T::Output> = match from_str(line_buf.as_str()) {
42 Ok(r) => r,
43 Err(_) => {
44 dump(&DumpType::from_env(child_pid), child_pid, line_buf.as_str());
45 continue;
46 }
47 };
48
49 debug!(id = resp.id, size, "response received");
50 in_flight_requests.complete(resp);
52 }
53}
54
55#[derive(Debug, Clone)]
57enum DumpType {
58 ToLog,
60 ToFile(PathBuf),
62}
63
64impl DumpType {
65 fn from_env(pid: u32) -> Self {
66 match env::var(dump_error_resp_env(pid)) {
67 Ok(path) => Self::ToFile(path.into()),
68 Err(_) => Self::ToLog,
69 }
70 }
71}
72
73fn dump(dt: &DumpType, child_pid: u32, data: &str) {
74 #[inline]
75 fn truncate(data: &str) -> String {
76 const TRUNCATE_SIZE: usize = 100;
77
78 if data.len() <= TRUNCATE_SIZE {
79 return data.to_string();
80 }
81
82 let trunc_len = (0..TRUNCATE_SIZE + 1).rposition(|index| data.is_char_boundary(index)).unwrap_or(0);
83 format!("{}...", &data[..trunc_len])
84 }
85
86 match dt {
87 DumpType::ToLog => {
88 error!(child_pid = child_pid, "failed to unmarshal response string: '{}'.", truncate(data));
89 }
90 DumpType::ToFile(dir) => match dump_to_file(child_pid, dir, data.as_bytes()) {
91 Ok(dump_file_path) => {
92 error!(
93 child_pid = child_pid,
94 "failed to unmarshal response string. dump file '{}' generated.",
95 dump_file_path.display()
96 )
97 }
98 Err(e) => {
99 error!(
100 child_pid = child_pid,
101 "failed to unmarshal response string; failed to generate dump file: '{}'.", e
102 );
103 }
104 },
105 }
106}
107
108fn dump_to_file(child_pid: u32, dir: impl AsRef<Path>, data: &[u8]) -> Result<PathBuf> {
109 #[inline]
110 fn ensure_dir(dir: &Path) -> Result<()> {
111 if !dir.exists() {
112 fs::create_dir_all(dir).with_context(|| format!("create directory '{}' for dump files", dir.display()))?;
113 } else if !dir.is_dir() {
114 return Err(anyhow!("'{}' is not directory", dir.display()));
115 }
116 Ok(())
117 }
118
119 let dir = dir.as_ref();
120 ensure_dir(dir)?;
121 let filename = format!("ext-processor-err-resp-{}-{}.json", child_pid, Uuid::new_v4().as_simple());
122 let path = dir.join(filename);
123 fs::write(&path, data)?;
124 Ok(path)
125}
126
127pub struct Hooks<P, F> {
128 pub prepare: Option<P>,
129 pub finalize: Option<F>,
130}
131
132pub struct ProducerBuilder<HP, HF> {
134 bin: PathBuf,
135 args: Vec<String>,
136 envs: HashMap<String, String>,
137
138 inherit_envs: bool,
139 stable_timeout: Option<Duration>,
140 hooks: Hooks<HP, HF>,
141 auto_restart: bool,
142}
143
144impl<HP, HF> ProducerBuilder<HP, HF> {
145 pub fn new(bin: PathBuf, args: Vec<String>) -> Self {
147 ProducerBuilder {
148 bin,
149 args,
150 envs: HashMap::new(),
151 inherit_envs: true,
152 stable_timeout: None,
153 hooks: Hooks {
154 prepare: None,
155 finalize: None,
156 },
157 auto_restart: false,
158 }
159 }
160
161 pub fn inherit_envs(mut self, yes: bool) -> Self {
163 self.inherit_envs = yes;
164 self
165 }
166
167 pub fn env(mut self, name: String, value: String) -> Self {
169 self.envs.insert(name, value);
170 self
171 }
172
173 pub fn stable_timeout(mut self, timeout: Duration) -> Self {
176 self.stable_timeout.replace(timeout);
177 self
178 }
179
180 #[cfg(feature = "numa")]
182 pub fn numa_preferred(self, node: std::os::raw::c_int) -> Self {
183 self.env(crate::sys::numa::ENV_NUMA_PREFERRED.to_string(), node.to_string())
184 }
185
186 pub fn hook_prepare(mut self, f: HP) -> Self {
188 self.hooks.prepare.replace(f);
189 self
190 }
191
192 pub fn hook_finalize(mut self, f: HF) -> Self {
194 self.hooks.finalize.replace(f);
195 self
196 }
197
198 pub fn auto_restart(mut self, auto_restart: bool) -> Self {
200 self.auto_restart = auto_restart;
201 self
202 }
203
204 pub fn cpuset<S: Into<String>>(mut self, cgname: S, cpuset: S) -> Self {
206 self = self.env(crate::sys::cgroup::ENV_CGROUP_NAME.to_string(), cgname.into());
207 self.env(crate::sys::cgroup::ENV_CGROUP_CPUSET.to_string(), cpuset.into())
208 }
209
210 pub fn spawn<T: Task>(self) -> Result<Producer<T, HP, HF>> {
212 let ProducerBuilder {
213 bin,
214 args,
215 mut envs,
216 inherit_envs,
217 stable_timeout,
218 hooks,
219 auto_restart,
220 } = self;
221
222 if inherit_envs {
223 envs.extend(vars());
224 }
225
226 let cmd = move || {
227 let mut cmd = Command::new(&bin);
228 cmd.args(args.clone())
229 .envs(envs.clone())
230 .stdin(Stdio::piped())
231 .stdout(Stdio::piped())
232 .stderr(Stdio::inherit());
233 cmd
234 };
235
236 let (producer_inner, mut child_stdout) = ProducerInner::new(cmd(), T::STAGE, stable_timeout).context("create producer inner")?;
237 let child_pid = producer_inner.child_id();
238 let producer_inner = Arc::new(Mutex::new(producer_inner));
239 let in_flight_requests = InflightRequests::new();
240
241 let producer = Producer {
242 next_id: AtomicU64::new(1),
243 inner: producer_inner.clone(),
244 hooks,
245 in_flight_requests: in_flight_requests.clone(),
246 };
247
248 thread::spawn(move || {
249 loop {
250 if let Err(e) =
251 start_response_handler::<T>(child_pid, child_stdout, in_flight_requests.clone()).context("start response handler")
252 {
253 error!(err=?e, "failed to start response handler. pid: {}", child_pid);
254 }
255
256 in_flight_requests.cancel_all(format!("child process exited: {}", T::STAGE));
259
260 if !auto_restart {
261 break;
262 }
263 thread::sleep(Duration::from_secs(3));
264
265 let mut inner = producer_inner.lock().unwrap();
266 match inner.restart_child(cmd(), T::STAGE, stable_timeout) {
267 Ok(new_child_stdout) => {
268 child_stdout = new_child_stdout;
269 }
270 Err(e) => {
271 error!(err=?e, "unable to restart child process");
272 break;
273 }
274 }
275 }
276 });
277
278 Ok(producer)
279 }
280}
281
282fn wait_for_stable(stage: &'static str, stdout: ChildStdout, mut stable_timeout: Option<Duration>) -> Result<ChildStdout> {
283 fn inner(res_tx: Sender<Result<()>>, stage: &'static str, stdout: ChildStdout) -> thread::JoinHandle<ChildStdout> {
284 std::thread::spawn(move || {
285 let expected = ready_msg(stage);
286 let mut line = String::with_capacity(expected.len() + 1);
287
288 let mut buf = BufReader::new(stdout);
289 let res = buf.read_line(&mut line).map_err(|e| e.into()).and_then(|_| {
290 if line.as_str().trim() == expected.as_str() {
291 Ok(())
292 } else {
293 Err(anyhow!("unexpected first line: {}", line))
294 }
295 });
296
297 let _ = res_tx.send(res);
298 buf.into_inner()
299 })
300 }
301
302 let (stable_tx, stable_rx) = bounded(0);
303 let stable_hdl = inner(stable_tx, stage, stdout);
304 let wait = stable_timeout.take().map(after).unwrap_or_else(never);
305
306 select! {
307 recv(stable_rx) -> ready_res => {
308 ready_res.context("stable chan broken")?.context("wait for stable")?;
309 info!("producer ready");
310 let stdout = stable_hdl.join().map_err(|_| anyhow!("wait for stable handle to be joined"))?;
311 Ok(stdout)
312 },
313
314 recv(wait) -> _ => {
315 Err(anyhow!("timeout exceeded before child get ready"))
316 }
317 }
318}
319
320pub struct Producer<T: Task, HP, HF> {
323 next_id: AtomicU64,
324 inner: Arc<Mutex<ProducerInner>>,
325 hooks: Hooks<HP, HF>,
326 in_flight_requests: InflightRequests<T::Output>,
327}
328
329impl<T, HP, HF> Producer<T, HP, HF>
330where
331 T: Task,
332{
333 pub fn child_pid(&self) -> u32 {
335 self.inner.lock().unwrap().child_id()
336 }
337
338 pub fn next_id(&self) -> u64 {
340 self.next_id.fetch_add(1, Ordering::Relaxed)
341 }
342
343 fn send(&self, req: &Request<T>) -> Result<()> {
344 let data = to_string(req).context("marshal request")?;
345 self.inner.lock().unwrap().write_data(data)
346 }
347}
348
349struct ProducerInner {
350 child: Child,
351 child_stdin: ChildStdin,
352}
353
354impl ProducerInner {
355 fn new(cmd: Command, stage: &'static str, stable_timeout: Option<Duration>) -> Result<(Self, ChildStdout)> {
356 let (child, child_stdin, child_stdout) = Self::create_child_and_wait_it(cmd, stage, stable_timeout)?;
357 Ok((Self { child, child_stdin }, child_stdout))
358 }
359
360 fn child_id(&self) -> u32 {
361 self.child.id()
362 }
363
364 fn write_data(&mut self, data: String) -> Result<()> {
365 writeln!(self.child_stdin, "{}", data).context("write request data to child process")?;
366 self.child_stdin.flush().context("flush data to child process")
367 }
368
369 fn restart_child(&mut self, cmd: Command, stage: &'static str, stable_timeout: Option<Duration>) -> Result<ChildStdout> {
371 info!("restart the child process: {:?}", cmd);
372 self.kill_child();
373
374 let (child, child_stdin, child_stdout) = Self::create_child_and_wait_it(cmd, stage, stable_timeout)?;
375 self.child = child;
376 self.child_stdin = child_stdin;
377
378 Ok(child_stdout)
379 }
380
381 fn create_child_and_wait_it(
382 mut cmd: Command,
383 stage: &'static str,
384 stable_timeout: Option<Duration>,
385 ) -> Result<(Child, ChildStdin, ChildStdout)> {
386 let mut child = cmd.spawn().context("spawn child process")?;
387 let child_stdin = child.stdin.take().context("child stdin lost")?;
388 let mut child_stdout = child.stdout.take().context("child stdout lost")?;
389 child_stdout = wait_for_stable(stage, child_stdout, stable_timeout).context("wait for child process stable")?;
390 Ok((child, child_stdin, child_stdout))
391 }
392
393 fn kill_child(&mut self) {
394 info!(pid = self.child.id(), "kill child");
395 let _ = self.child.kill();
396 let _ = self.child.wait();
397 }
398}
399
400impl Drop for ProducerInner {
401 fn drop(&mut self) {
402 self.kill_child()
403 }
404}
405
406#[derive(Debug, Default)]
407struct InflightRequests<O>(Arc<Mutex<FnvHashMap<u64, Sender<Response<O>>>>>);
408
409impl<O> InflightRequests<O> {
410 pub fn new() -> Self {
411 Self(Default::default())
412 }
413
414 pub fn sent(&self, id: u64) -> Receiver<Response<O>> {
416 debug!("sent request: {}", id);
417 let (tx, rx) = bounded(0);
418 self.0.lock().unwrap().insert(id, tx);
419 rx
420 }
421
422 pub fn remove(&self, id: u64) {
423 self.0.lock().unwrap().remove(&id);
424 }
425
426 pub fn complete(&self, resp: Response<O>) {
428 if let Some(tx) = self.0.lock().unwrap().remove(&resp.id) {
429 let _ = tx.send(resp);
430 }
431 }
432
433 pub fn cancel_all(&self, err_msg: impl AsRef<str>) {
434 let mut inner = self.0.lock().unwrap();
435 let keys: Vec<_> = inner.keys().cloned().collect();
436 for id in keys {
437 if let Some(tx) = inner.remove(&id) {
438 let _ = tx.send(Response {
439 id,
440 err_msg: Some(err_msg.as_ref().to_string()),
441 output: None,
442 });
443 debug!("canceled request: {}", id);
444 }
445 }
446 }
447}
448
449impl<T> Clone for InflightRequests<T> {
450 fn clone(&self) -> Self {
451 Self(self.0.clone())
452 }
453}
454
455pub type BoxedPrepareHook<T> = Box<dyn Fn(&Request<T>) -> Result<()> + Send + Sync>;
457
458pub type BoxedFinalizeHook<T> = Box<dyn Fn(&Request<T>) + Send + Sync>;
460
461impl<T, HP, HF> Processor<T> for Producer<T, HP, HF>
462where
463 T: Task,
464 HP: Fn(&Request<T>) -> Result<()> + Send + Sync,
465 HF: Fn(&Request<T>) + Send + Sync,
466{
467 fn process(&self, task: T) -> Result<T::Output> {
468 let req = Request { id: self.next_id(), task };
469
470 if let Some(p) = self.hooks.prepare.as_ref() {
471 p(&req).context("prepare task")?;
472 };
473
474 let _defer = Defer(true, || {
475 if let Some(f) = self.hooks.finalize.as_ref() {
476 f(&req);
477 }
478 });
479
480 let rx = self.in_flight_requests.sent(req.id);
481 if let Err(e) = self.send(&req) {
482 self.in_flight_requests.remove(req.id);
483 return Err(e);
484 }
485
486 debug!("wait request: {}", req.id);
487 let mut output = rx.recv().map_err(|_| anyhow!("output channel broken"))?;
488 if let Some(err_msg) = output.err_msg.take() {
489 return Err(anyhow!(err_msg));
490 }
491 debug!("request done: {}", req.id);
492 output.output.take().context("output field lost")
493 }
494}
495
496struct Defer<F: FnMut()>(bool, F);
497
498impl<F: FnMut()> Drop for Defer<F> {
499 fn drop(&mut self) {
500 if self.0 {
501 (self.1)();
502 }
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use std::fs;
509
510 use pretty_assertions::assert_eq;
511 use tracing_test::traced_test;
512
513 use super::{dump, dump_to_file, DumpType};
514
515 #[test]
516 #[traced_test]
517 fn test_dump_to_log() {
518 let cases = vec![
519 ("abcdefg".to_string(), format!("'{}'", "abcdefg")),
520 ("一二三四五123".to_string(), format!("'{}'", "一二三四五123")),
521 ("a".repeat(100), format!("'{}'", "a".repeat(100))),
522 ("a".repeat(101), format!("'{}...'", "a".repeat(100))),
523 ("a".repeat(200), format!("'{}...'", "a".repeat(100))),
524 ("💗".repeat(100), format!("'{}...'", "💗".repeat(25))),
525 ];
526
527 for (data, expected_log) in cases {
528 dump(&DumpType::ToLog, 1, &data);
529 assert!(logs_contain(&expected_log));
530 }
531 }
532
533 #[test]
534 fn test_dump_to_file() {
535 use tempfile::tempdir;
536
537 let tmpdir = tempdir().expect("couldn't create temp dir");
538
539 let dumpfile = dump_to_file(1, tmpdir.path(), "hello world".as_bytes());
540 assert!(dumpfile.is_ok(), "dump_to_file: {:?}", dumpfile.err());
541 let dumpfile = dumpfile.unwrap();
542 assert_eq!(tmpdir.path(), dumpfile.parent().unwrap());
543 assert_eq!("hello world", fs::read_to_string(dumpfile).unwrap());
544 }
545
546 #[test]
547 fn test_dump_to_file_when_dir_not_exist() {
548 use tempfile::tempdir;
549
550 let tmpdir = tempdir().expect("couldn't create temp dir");
551 let not_exist_dir = tmpdir.path().join("test");
552
553 let dumpfile = dump_to_file(1, ¬_exist_dir, "hello world".as_bytes());
554 assert!(dumpfile.is_ok(), "dump_to_file: {:?}", dumpfile.err());
555 let dumpfile = dumpfile.unwrap();
556 assert_eq!(not_exist_dir, dumpfile.parent().unwrap());
557 assert_eq!("hello world", fs::read_to_string(dumpfile).unwrap());
558 }
559
560 #[test]
561 fn test_dump_to_file_when_not_dir() {
562 use tempfile::tempdir;
563 let tmpdir = tempdir().expect("couldn't create temp dir");
564 let tmpfile = tmpdir.path().join("test.json");
565 fs::write(&tmpfile, "oops").unwrap();
566
567 assert!(dump_to_file(1, &tmpfile, "hello world".as_bytes()).is_err());
568 }
569}