vc_processors/core/ext/
producer.rs

1use std::collections::HashMap;
2use std::env::{self, vars};
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
6use std::sync::{
7    atomic::{AtomicU64, Ordering},
8    Arc, Mutex,
9};
10use std::time::Duration;
11use std::{fs, io, thread};
12
13use anyhow::{anyhow, Context, Result};
14use crossbeam_channel::{after, bounded, never, select, Receiver, Sender};
15use fnv::FnvHashMap;
16use serde_json::{from_str, to_string};
17use tracing::{debug, error, info};
18use uuid::Uuid;
19
20use super::{ready_msg, Request, Response};
21use crate::core::{Processor, Task};
22
23/// Dump error resp env key
24pub fn dump_error_resp_env(pid: u32) -> String {
25    format!("DUMP_ERR_RESP_{}", pid)
26}
27
28fn start_response_handler<T: Task>(child_pid: u32, stdout: ChildStdout, in_flight_requests: InflightRequests<T::Output>) -> Result<()> {
29    let mut reader = BufReader::new(stdout);
30    let mut line_buf = String::new();
31
32    loop {
33        line_buf.clear();
34
35        let size = reader.read_line(&mut line_buf).context("read line from stdout")?;
36        if size == 0 {
37            error!("child exited");
38            return Err(io::Error::new(io::ErrorKind::BrokenPipe, "child process exit").into());
39        }
40
41        let resp: Response<T::Output> = match from_str(line_buf.as_str()) {
42            Ok(r) => r,
43            Err(_) => {
44                dump(&DumpType::from_env(child_pid), child_pid, line_buf.as_str());
45                continue;
46            }
47        };
48
49        debug!(id = resp.id, size, "response received");
50        // this should not be blocked
51        in_flight_requests.complete(resp);
52    }
53}
54
55/// Dump type of the error format response of the processor
56#[derive(Debug, Clone)]
57enum DumpType {
58    /// Dump error format response fragments to the log
59    ToLog,
60    /// Dump error format response to the file
61    ToFile(PathBuf),
62}
63
64impl DumpType {
65    fn from_env(pid: u32) -> Self {
66        match env::var(dump_error_resp_env(pid)) {
67            Ok(path) => Self::ToFile(path.into()),
68            Err(_) => Self::ToLog,
69        }
70    }
71}
72
73fn dump(dt: &DumpType, child_pid: u32, data: &str) {
74    #[inline]
75    fn truncate(data: &str) -> String {
76        const TRUNCATE_SIZE: usize = 100;
77
78        if data.len() <= TRUNCATE_SIZE {
79            return data.to_string();
80        }
81
82        let trunc_len = (0..TRUNCATE_SIZE + 1).rposition(|index| data.is_char_boundary(index)).unwrap_or(0);
83        format!("{}...", &data[..trunc_len])
84    }
85
86    match dt {
87        DumpType::ToLog => {
88            error!(child_pid = child_pid, "failed to unmarshal response string: '{}'.", truncate(data));
89        }
90        DumpType::ToFile(dir) => match dump_to_file(child_pid, dir, data.as_bytes()) {
91            Ok(dump_file_path) => {
92                error!(
93                    child_pid = child_pid,
94                    "failed to unmarshal response string. dump file '{}' generated.",
95                    dump_file_path.display()
96                )
97            }
98            Err(e) => {
99                error!(
100                    child_pid = child_pid,
101                    "failed to unmarshal response string; failed to generate dump file: '{}'.", e
102                );
103            }
104        },
105    }
106}
107
108fn dump_to_file(child_pid: u32, dir: impl AsRef<Path>, data: &[u8]) -> Result<PathBuf> {
109    #[inline]
110    fn ensure_dir(dir: &Path) -> Result<()> {
111        if !dir.exists() {
112            fs::create_dir_all(dir).with_context(|| format!("create directory '{}' for dump files", dir.display()))?;
113        } else if !dir.is_dir() {
114            return Err(anyhow!("'{}' is not directory", dir.display()));
115        }
116        Ok(())
117    }
118
119    let dir = dir.as_ref();
120    ensure_dir(dir)?;
121    let filename = format!("ext-processor-err-resp-{}-{}.json", child_pid, Uuid::new_v4().as_simple());
122    let path = dir.join(filename);
123    fs::write(&path, data)?;
124    Ok(path)
125}
126
127pub struct Hooks<P, F> {
128    pub prepare: Option<P>,
129    pub finalize: Option<F>,
130}
131
132/// Builder for Producer
133pub struct ProducerBuilder<HP, HF> {
134    bin: PathBuf,
135    args: Vec<String>,
136    envs: HashMap<String, String>,
137
138    inherit_envs: bool,
139    stable_timeout: Option<Duration>,
140    hooks: Hooks<HP, HF>,
141    auto_restart: bool,
142}
143
144impl<HP, HF> ProducerBuilder<HP, HF> {
145    /// Construct a new builder with the given binary path & args.
146    pub fn new(bin: PathBuf, args: Vec<String>) -> Self {
147        ProducerBuilder {
148            bin,
149            args,
150            envs: HashMap::new(),
151            inherit_envs: true,
152            stable_timeout: None,
153            hooks: Hooks {
154                prepare: None,
155                finalize: None,
156            },
157            auto_restart: false,
158        }
159    }
160
161    /// Set if we should inherit envs from the parent process, default is true.
162    pub fn inherit_envs(mut self, yes: bool) -> Self {
163        self.inherit_envs = yes;
164        self
165    }
166
167    /// Set a pair of env name & value for the child.
168    pub fn env(mut self, name: String, value: String) -> Self {
169        self.envs.insert(name, value);
170        self
171    }
172
173    /// Set the timeout before we wait for the child process to be stable, default is None, thus we
174    /// would block util the child gives the ready message.
175    pub fn stable_timeout(mut self, timeout: Duration) -> Self {
176        self.stable_timeout.replace(timeout);
177        self
178    }
179
180    /// Set if the child has a preferred numa node.
181    #[cfg(feature = "numa")]
182    pub fn numa_preferred(self, node: std::os::raw::c_int) -> Self {
183        self.env(crate::sys::numa::ENV_NUMA_PREFERRED.to_string(), node.to_string())
184    }
185
186    /// Set a prepare hook, which will be called before the Producer send the task to the child.
187    pub fn hook_prepare(mut self, f: HP) -> Self {
188        self.hooks.prepare.replace(f);
189        self
190    }
191
192    /// Set a finalize hook, which will be called before the Processor::process returns.
193    pub fn hook_finalize(mut self, f: HF) -> Self {
194        self.hooks.finalize.replace(f);
195        self
196    }
197
198    /// Set auto restart for child process
199    pub fn auto_restart(mut self, auto_restart: bool) -> Self {
200        self.auto_restart = auto_restart;
201        self
202    }
203
204    /// Set cpuset for child process
205    pub fn cpuset<S: Into<String>>(mut self, cgname: S, cpuset: S) -> Self {
206        self = self.env(crate::sys::cgroup::ENV_CGROUP_NAME.to_string(), cgname.into());
207        self.env(crate::sys::cgroup::ENV_CGROUP_CPUSET.to_string(), cpuset.into())
208    }
209
210    /// Build a Producer with the given options.
211    pub fn spawn<T: Task>(self) -> Result<Producer<T, HP, HF>> {
212        let ProducerBuilder {
213            bin,
214            args,
215            mut envs,
216            inherit_envs,
217            stable_timeout,
218            hooks,
219            auto_restart,
220        } = self;
221
222        if inherit_envs {
223            envs.extend(vars());
224        }
225
226        let cmd = move || {
227            let mut cmd = Command::new(&bin);
228            cmd.args(args.clone())
229                .envs(envs.clone())
230                .stdin(Stdio::piped())
231                .stdout(Stdio::piped())
232                .stderr(Stdio::inherit());
233            cmd
234        };
235
236        let (producer_inner, mut child_stdout) = ProducerInner::new(cmd(), T::STAGE, stable_timeout).context("create producer inner")?;
237        let child_pid = producer_inner.child_id();
238        let producer_inner = Arc::new(Mutex::new(producer_inner));
239        let in_flight_requests = InflightRequests::new();
240
241        let producer = Producer {
242            next_id: AtomicU64::new(1),
243            inner: producer_inner.clone(),
244            hooks,
245            in_flight_requests: in_flight_requests.clone(),
246        };
247
248        thread::spawn(move || {
249            loop {
250                if let Err(e) =
251                    start_response_handler::<T>(child_pid, child_stdout, in_flight_requests.clone()).context("start response handler")
252                {
253                    error!(err=?e, "failed to start response handler. pid: {}", child_pid);
254                }
255
256                // child process exist
257                // cancel all in flight requests
258                in_flight_requests.cancel_all(format!("child process exited: {}", T::STAGE));
259
260                if !auto_restart {
261                    break;
262                }
263                thread::sleep(Duration::from_secs(3));
264
265                let mut inner = producer_inner.lock().unwrap();
266                match inner.restart_child(cmd(), T::STAGE, stable_timeout) {
267                    Ok(new_child_stdout) => {
268                        child_stdout = new_child_stdout;
269                    }
270                    Err(e) => {
271                        error!(err=?e, "unable to restart child process");
272                        break;
273                    }
274                }
275            }
276        });
277
278        Ok(producer)
279    }
280}
281
282fn wait_for_stable(stage: &'static str, stdout: ChildStdout, mut stable_timeout: Option<Duration>) -> Result<ChildStdout> {
283    fn inner(res_tx: Sender<Result<()>>, stage: &'static str, stdout: ChildStdout) -> thread::JoinHandle<ChildStdout> {
284        std::thread::spawn(move || {
285            let expected = ready_msg(stage);
286            let mut line = String::with_capacity(expected.len() + 1);
287
288            let mut buf = BufReader::new(stdout);
289            let res = buf.read_line(&mut line).map_err(|e| e.into()).and_then(|_| {
290                if line.as_str().trim() == expected.as_str() {
291                    Ok(())
292                } else {
293                    Err(anyhow!("unexpected first line: {}", line))
294                }
295            });
296
297            let _ = res_tx.send(res);
298            buf.into_inner()
299        })
300    }
301
302    let (stable_tx, stable_rx) = bounded(0);
303    let stable_hdl = inner(stable_tx, stage, stdout);
304    let wait = stable_timeout.take().map(after).unwrap_or_else(never);
305
306    select! {
307        recv(stable_rx) -> ready_res => {
308            ready_res.context("stable chan broken")?.context("wait for stable")?;
309            info!("producer ready");
310            let stdout = stable_hdl.join().map_err(|_| anyhow!("wait for stable handle to be joined"))?;
311            Ok(stdout)
312        },
313
314        recv(wait) -> _ => {
315            Err(anyhow!("timeout exceeded before child get ready"))
316        }
317    }
318}
319
320/// Producer sends tasks to the child, and waits for the responses.
321/// It impl Processor.
322pub struct Producer<T: Task, HP, HF> {
323    next_id: AtomicU64,
324    inner: Arc<Mutex<ProducerInner>>,
325    hooks: Hooks<HP, HF>,
326    in_flight_requests: InflightRequests<T::Output>,
327}
328
329impl<T, HP, HF> Producer<T, HP, HF>
330where
331    T: Task,
332{
333    /// Returns the child's process id.
334    pub fn child_pid(&self) -> u32 {
335        self.inner.lock().unwrap().child_id()
336    }
337
338    /// Returns the child's process id.
339    pub fn next_id(&self) -> u64 {
340        self.next_id.fetch_add(1, Ordering::Relaxed)
341    }
342
343    fn send(&self, req: &Request<T>) -> Result<()> {
344        let data = to_string(req).context("marshal request")?;
345        self.inner.lock().unwrap().write_data(data)
346    }
347}
348
349struct ProducerInner {
350    child: Child,
351    child_stdin: ChildStdin,
352}
353
354impl ProducerInner {
355    fn new(cmd: Command, stage: &'static str, stable_timeout: Option<Duration>) -> Result<(Self, ChildStdout)> {
356        let (child, child_stdin, child_stdout) = Self::create_child_and_wait_it(cmd, stage, stable_timeout)?;
357        Ok((Self { child, child_stdin }, child_stdout))
358    }
359
360    fn child_id(&self) -> u32 {
361        self.child.id()
362    }
363
364    fn write_data(&mut self, data: String) -> Result<()> {
365        writeln!(self.child_stdin, "{}", data).context("write request data to child process")?;
366        self.child_stdin.flush().context("flush data to child process")
367    }
368
369    /// restart_child restarts the child process
370    fn restart_child(&mut self, cmd: Command, stage: &'static str, stable_timeout: Option<Duration>) -> Result<ChildStdout> {
371        info!("restart the child process: {:?}", cmd);
372        self.kill_child();
373
374        let (child, child_stdin, child_stdout) = Self::create_child_and_wait_it(cmd, stage, stable_timeout)?;
375        self.child = child;
376        self.child_stdin = child_stdin;
377
378        Ok(child_stdout)
379    }
380
381    fn create_child_and_wait_it(
382        mut cmd: Command,
383        stage: &'static str,
384        stable_timeout: Option<Duration>,
385    ) -> Result<(Child, ChildStdin, ChildStdout)> {
386        let mut child = cmd.spawn().context("spawn child process")?;
387        let child_stdin = child.stdin.take().context("child stdin lost")?;
388        let mut child_stdout = child.stdout.take().context("child stdout lost")?;
389        child_stdout = wait_for_stable(stage, child_stdout, stable_timeout).context("wait for child process stable")?;
390        Ok((child, child_stdin, child_stdout))
391    }
392
393    fn kill_child(&mut self) {
394        info!(pid = self.child.id(), "kill child");
395        let _ = self.child.kill();
396        let _ = self.child.wait();
397    }
398}
399
400impl Drop for ProducerInner {
401    fn drop(&mut self) {
402        self.kill_child()
403    }
404}
405
406#[derive(Debug, Default)]
407struct InflightRequests<O>(Arc<Mutex<FnvHashMap<u64, Sender<Response<O>>>>>);
408
409impl<O> InflightRequests<O> {
410    pub fn new() -> Self {
411        Self(Default::default())
412    }
413
414    /// sent represents the specified request data has sent to the target.
415    pub fn sent(&self, id: u64) -> Receiver<Response<O>> {
416        debug!("sent request: {}", id);
417        let (tx, rx) = bounded(0);
418        self.0.lock().unwrap().insert(id, tx);
419        rx
420    }
421
422    pub fn remove(&self, id: u64) {
423        self.0.lock().unwrap().remove(&id);
424    }
425
426    /// complete represents the specified response is ready
427    pub fn complete(&self, resp: Response<O>) {
428        if let Some(tx) = self.0.lock().unwrap().remove(&resp.id) {
429            let _ = tx.send(resp);
430        }
431    }
432
433    pub fn cancel_all(&self, err_msg: impl AsRef<str>) {
434        let mut inner = self.0.lock().unwrap();
435        let keys: Vec<_> = inner.keys().cloned().collect();
436        for id in keys {
437            if let Some(tx) = inner.remove(&id) {
438                let _ = tx.send(Response {
439                    id,
440                    err_msg: Some(err_msg.as_ref().to_string()),
441                    output: None,
442                });
443                debug!("canceled request: {}", id);
444            }
445        }
446    }
447}
448
449impl<T> Clone for InflightRequests<T> {
450    fn clone(&self) -> Self {
451        Self(self.0.clone())
452    }
453}
454
455/// Could be use to avoid type annotations problem
456pub type BoxedPrepareHook<T> = Box<dyn Fn(&Request<T>) -> Result<()> + Send + Sync>;
457
458/// Could be use to avoid type annotations problem
459pub type BoxedFinalizeHook<T> = Box<dyn Fn(&Request<T>) + Send + Sync>;
460
461impl<T, HP, HF> Processor<T> for Producer<T, HP, HF>
462where
463    T: Task,
464    HP: Fn(&Request<T>) -> Result<()> + Send + Sync,
465    HF: Fn(&Request<T>) + Send + Sync,
466{
467    fn process(&self, task: T) -> Result<T::Output> {
468        let req = Request { id: self.next_id(), task };
469
470        if let Some(p) = self.hooks.prepare.as_ref() {
471            p(&req).context("prepare task")?;
472        };
473
474        let _defer = Defer(true, || {
475            if let Some(f) = self.hooks.finalize.as_ref() {
476                f(&req);
477            }
478        });
479
480        let rx = self.in_flight_requests.sent(req.id);
481        if let Err(e) = self.send(&req) {
482            self.in_flight_requests.remove(req.id);
483            return Err(e);
484        }
485
486        debug!("wait request: {}", req.id);
487        let mut output = rx.recv().map_err(|_| anyhow!("output channel broken"))?;
488        if let Some(err_msg) = output.err_msg.take() {
489            return Err(anyhow!(err_msg));
490        }
491        debug!("request done: {}", req.id);
492        output.output.take().context("output field lost")
493    }
494}
495
496struct Defer<F: FnMut()>(bool, F);
497
498impl<F: FnMut()> Drop for Defer<F> {
499    fn drop(&mut self) {
500        if self.0 {
501            (self.1)();
502        }
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use std::fs;
509
510    use pretty_assertions::assert_eq;
511    use tracing_test::traced_test;
512
513    use super::{dump, dump_to_file, DumpType};
514
515    #[test]
516    #[traced_test]
517    fn test_dump_to_log() {
518        let cases = vec![
519            ("abcdefg".to_string(), format!("'{}'", "abcdefg")),
520            ("一二三四五123".to_string(), format!("'{}'", "一二三四五123")),
521            ("a".repeat(100), format!("'{}'", "a".repeat(100))),
522            ("a".repeat(101), format!("'{}...'", "a".repeat(100))),
523            ("a".repeat(200), format!("'{}...'", "a".repeat(100))),
524            ("💗".repeat(100), format!("'{}...'", "💗".repeat(25))),
525        ];
526
527        for (data, expected_log) in cases {
528            dump(&DumpType::ToLog, 1, &data);
529            assert!(logs_contain(&expected_log));
530        }
531    }
532
533    #[test]
534    fn test_dump_to_file() {
535        use tempfile::tempdir;
536
537        let tmpdir = tempdir().expect("couldn't create temp dir");
538
539        let dumpfile = dump_to_file(1, tmpdir.path(), "hello world".as_bytes());
540        assert!(dumpfile.is_ok(), "dump_to_file: {:?}", dumpfile.err());
541        let dumpfile = dumpfile.unwrap();
542        assert_eq!(tmpdir.path(), dumpfile.parent().unwrap());
543        assert_eq!("hello world", fs::read_to_string(dumpfile).unwrap());
544    }
545
546    #[test]
547    fn test_dump_to_file_when_dir_not_exist() {
548        use tempfile::tempdir;
549
550        let tmpdir = tempdir().expect("couldn't create temp dir");
551        let not_exist_dir = tmpdir.path().join("test");
552
553        let dumpfile = dump_to_file(1, &not_exist_dir, "hello world".as_bytes());
554        assert!(dumpfile.is_ok(), "dump_to_file: {:?}", dumpfile.err());
555        let dumpfile = dumpfile.unwrap();
556        assert_eq!(not_exist_dir, dumpfile.parent().unwrap());
557        assert_eq!("hello world", fs::read_to_string(dumpfile).unwrap());
558    }
559
560    #[test]
561    fn test_dump_to_file_when_not_dir() {
562        use tempfile::tempdir;
563        let tmpdir = tempdir().expect("couldn't create temp dir");
564        let tmpfile = tmpdir.path().join("test.json");
565        fs::write(&tmpfile, "oops").unwrap();
566
567        assert!(dump_to_file(1, &tmpfile, "hello world".as_bytes()).is_err());
568    }
569}