dagger_core/
cli_session.rs

1use std::{fs::canonicalize, path::PathBuf, process::Stdio, sync::Arc};
2
3use tokio::io::AsyncBufReadExt;
4
5use crate::{config::Config, connect_params::ConnectParams};
6
7#[derive(Clone, Debug)]
8pub struct CliSession {
9    inner: Arc<InnerCliSession>,
10}
11
12impl CliSession {
13    pub fn new() -> Self {
14        Self {
15            inner: Arc::new(InnerCliSession {}),
16        }
17    }
18
19    pub async fn connect(
20        &self,
21        config: &Config,
22        cli_path: &PathBuf,
23    ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
24        self.inner.connect(config, cli_path).await
25    }
26}
27
28#[derive(Debug)]
29struct InnerCliSession {}
30
31impl InnerCliSession {
32    pub async fn connect(
33        &self,
34        config: &Config,
35        cli_path: &PathBuf,
36    ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
37        let proc = self.start(config, cli_path)?;
38        let params = self.get_conn(proc, config).await?;
39        Ok(params)
40    }
41
42    fn start(&self, config: &Config, cli_path: &PathBuf) -> eyre::Result<tokio::process::Child> {
43        let mut args: Vec<String> = vec!["session".into()];
44        if let Some(workspace) = &config.workdir_path {
45            let abs_path = canonicalize(workspace)?;
46            args.extend(["--workdir".into(), abs_path.to_string_lossy().to_string()])
47        }
48        if let Some(config_path) = &config.config_path {
49            let abs_path = canonicalize(config_path)?;
50            args.extend(["--project".into(), abs_path.to_string_lossy().to_string()])
51        }
52
53        let proc = tokio::process::Command::new(
54            cli_path
55                .to_str()
56                .ok_or(eyre::anyhow!("could not get string from path"))?,
57        )
58        .args(args.as_slice())
59        .stdin(Stdio::piped())
60        .stdout(Stdio::piped())
61        .stderr(Stdio::piped())
62        .spawn()?;
63
64        //TODO: Add retry mechanism
65
66        return Ok(proc);
67    }
68
69    async fn get_conn(
70        &self,
71        mut proc: tokio::process::Child,
72        config: &Config,
73    ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
74        let stdout = proc
75            .stdout
76            .take()
77            .ok_or(eyre::anyhow!("could not acquire stdout from child process"))?;
78
79        let stderr = proc
80            .stderr
81            .take()
82            .ok_or(eyre::anyhow!("could not acquire stderr from child process"))?;
83
84        let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
85
86        let logger = config.logger.as_ref().map(|p| p.clone());
87        tokio::spawn(async move {
88            let mut stdout_bufr = tokio::io::BufReader::new(stdout).lines();
89            while let Ok(Some(line)) = stdout_bufr.next_line().await {
90                if let Ok(conn) = serde_json::from_str::<ConnectParams>(&line) {
91                    sender.send(conn).await.unwrap();
92                    continue;
93                }
94
95                if let Some(logger) = &logger {
96                    logger.stdout(&line).unwrap();
97                }
98            }
99        });
100
101        let logger = config.logger.as_ref().map(|p| p.clone());
102        tokio::spawn(async move {
103            let mut stderr_bufr = tokio::io::BufReader::new(stderr).lines();
104            while let Ok(Some(line)) = stderr_bufr.next_line().await {
105                if let Some(logger) = &logger {
106                    logger.stdout(&line).unwrap();
107                }
108            }
109        });
110
111        let conn = receiver.recv().await.ok_or(eyre::anyhow!(
112            "could not receive ok signal from dagger-engine"
113        ))?;
114
115        Ok((conn, proc))
116    }
117}