dagger_core/
cli_session.rs1use std::{fs::canonicalize, path::PathBuf, process::Stdio, sync::Arc};
2
3use tokio::io::AsyncBufReadExt;
4
5use crate::{config::Config, connect_params::ConnectParams};
6
7#[derive(Clone, Debug)]
8pub struct CliSession {
9 inner: Arc<InnerCliSession>,
10}
11
12impl CliSession {
13 pub fn new() -> Self {
14 Self {
15 inner: Arc::new(InnerCliSession {}),
16 }
17 }
18
19 pub async fn connect(
20 &self,
21 config: &Config,
22 cli_path: &PathBuf,
23 ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
24 self.inner.connect(config, cli_path).await
25 }
26}
27
28#[derive(Debug)]
29struct InnerCliSession {}
30
31impl InnerCliSession {
32 pub async fn connect(
33 &self,
34 config: &Config,
35 cli_path: &PathBuf,
36 ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
37 let proc = self.start(config, cli_path)?;
38 let params = self.get_conn(proc, config).await?;
39 Ok(params)
40 }
41
42 fn start(&self, config: &Config, cli_path: &PathBuf) -> eyre::Result<tokio::process::Child> {
43 let mut args: Vec<String> = vec!["session".into()];
44 if let Some(workspace) = &config.workdir_path {
45 let abs_path = canonicalize(workspace)?;
46 args.extend(["--workdir".into(), abs_path.to_string_lossy().to_string()])
47 }
48 if let Some(config_path) = &config.config_path {
49 let abs_path = canonicalize(config_path)?;
50 args.extend(["--project".into(), abs_path.to_string_lossy().to_string()])
51 }
52
53 let proc = tokio::process::Command::new(
54 cli_path
55 .to_str()
56 .ok_or(eyre::anyhow!("could not get string from path"))?,
57 )
58 .args(args.as_slice())
59 .stdin(Stdio::piped())
60 .stdout(Stdio::piped())
61 .stderr(Stdio::piped())
62 .spawn()?;
63
64 return Ok(proc);
67 }
68
69 async fn get_conn(
70 &self,
71 mut proc: tokio::process::Child,
72 config: &Config,
73 ) -> eyre::Result<(ConnectParams, tokio::process::Child)> {
74 let stdout = proc
75 .stdout
76 .take()
77 .ok_or(eyre::anyhow!("could not acquire stdout from child process"))?;
78
79 let stderr = proc
80 .stderr
81 .take()
82 .ok_or(eyre::anyhow!("could not acquire stderr from child process"))?;
83
84 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
85
86 let logger = config.logger.as_ref().map(|p| p.clone());
87 tokio::spawn(async move {
88 let mut stdout_bufr = tokio::io::BufReader::new(stdout).lines();
89 while let Ok(Some(line)) = stdout_bufr.next_line().await {
90 if let Ok(conn) = serde_json::from_str::<ConnectParams>(&line) {
91 sender.send(conn).await.unwrap();
92 continue;
93 }
94
95 if let Some(logger) = &logger {
96 logger.stdout(&line).unwrap();
97 }
98 }
99 });
100
101 let logger = config.logger.as_ref().map(|p| p.clone());
102 tokio::spawn(async move {
103 let mut stderr_bufr = tokio::io::BufReader::new(stderr).lines();
104 while let Ok(Some(line)) = stderr_bufr.next_line().await {
105 if let Some(logger) = &logger {
106 logger.stdout(&line).unwrap();
107 }
108 }
109 });
110
111 let conn = receiver.recv().await.ok_or(eyre::anyhow!(
112 "could not receive ok signal from dagger-engine"
113 ))?;
114
115 Ok((conn, proc))
116 }
117}