concord_client/
api_client.rs

1use std::borrow::Cow;
2use std::path::PathBuf;
3
4use http::{
5    HeaderValue,
6    header::{self, HeaderMap},
7};
8use reqwest::multipart;
9use url::Url;
10
11use crate::model::{ApiToken, ProcessEntry, StartProcessResponse};
12use crate::{
13    api_err, api_error,
14    error::ApiError,
15    model::{
16        AgentId, LogSegmentId, LogSegmentOperationResponse, LogSegmentRequest, LogSegmentUpdateRequest,
17        ProcessId, ProcessStatus, SessionToken, USER_AGENT_VALUE,
18    },
19};
20
21macro_rules! get {
22    ($client:expr, $url_fmt:expr $(, $args:expr)*) => {
23        $client.parent.client
24            .get($client.config.base_url.join(&format!($url_fmt, $($args),*)).expect("valid url"))
25    };
26}
27
28macro_rules! post {
29    ($client:expr, $url_fmt:expr $(, $args:expr)*) => {
30        $client
31            .parent
32            .client
33            .post($client.config.base_url.join(&format!($url_fmt, $($args),*)).expect("valid url"))
34    };
35}
36
37macro_rules! post_json {
38    ($client:expr, $body:expr, $url_fmt:expr $(, $args:expr)*) => {
39        post!($client, $url_fmt $(, $args),*)
40        .header(header::CONTENT_TYPE, "application/json")
41        .json($body)
42    };
43}
44
45pub struct Config {
46    pub base_url: Url,
47    pub session_token: Option<SessionToken>,
48    pub api_token: Option<ApiToken>,
49    pub temp_dir: Option<PathBuf>,
50}
51
52pub struct ApiClient {
53    config: Config,
54    client: reqwest::Client,
55}
56
57impl ApiClient {
58    pub fn new(config: Config) -> Result<Self, ApiError> {
59        let mut default_headers = HeaderMap::new();
60        default_headers.insert(header::USER_AGENT, HeaderValue::from_static(USER_AGENT_VALUE));
61
62        if let Some(session_token) = &config.session_token {
63            default_headers.insert(
64                "X-Concord-SessionToken",
65                HeaderValue::try_from(session_token).map_err(|e| api_error!("Invalid session_token: {e}"))?,
66            );
67        } else if let Some(api_token) = &config.api_token {
68            default_headers.insert(
69                "Authorization",
70                HeaderValue::try_from(api_token).map_err(|e| api_error!("Invalid api_token: {e}"))?,
71            );
72        }
73
74        let client = reqwest::ClientBuilder::new()
75            .default_headers(default_headers)
76            .build()?;
77
78        Ok(ApiClient { config, client })
79    }
80
81    pub fn process_api(&self) -> ProcessApiClient {
82        ProcessApiClient {
83            config: &self.config,
84            parent: self,
85        }
86    }
87}
88
89pub struct ProcessApiClient<'a> {
90    config: &'a Config,
91    parent: &'a ApiClient,
92}
93
94impl std::fmt::Debug for ProcessApiClient<'_> {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("ProcessApiClient")
97            .field("config.base_url", &self.config.base_url)
98            .finish()
99    }
100}
101
102impl ProcessApiClient<'_> {
103    pub async fn start_process<I, T>(&self, input: I) -> Result<StartProcessResponse, ApiError>
104    where
105        I: IntoIterator<Item = (T, multipart::Part)>,
106        T: Into<Cow<'static, str>>,
107    {
108        let mut form = multipart::Form::new();
109        for (k, v) in input.into_iter() {
110            form = form.part(k, v);
111        }
112
113        let resp = post!(self, "/api/v1/process").multipart(form).send().await?;
114
115        if resp.status().is_success() {
116            Ok(resp.json().await?)
117        } else {
118            api_err!("Failed to start a process: {}", resp.status())
119        }
120    }
121
122    pub async fn get_process(&self, process_id: &ProcessId) -> Result<ProcessEntry, ApiError> {
123        let resp = get!(self, "/api/v1/process/{process_id}").send().await?;
124
125        if resp.status().is_success() {
126            Ok(resp.json().await?)
127        } else {
128            api_err!("Failed to get process: {}", resp.status())
129        }
130    }
131
132    pub async fn update_status(
133        &self,
134        process_id: ProcessId,
135        agent_id: AgentId,
136        status: ProcessStatus,
137    ) -> Result<(), ApiError> {
138        let req = format!("{status}");
139        let resp = post!(self, "/api/v1/process/{process_id}/status")
140            .query(&[("agent_id", agent_id)])
141            .header(header::CONTENT_TYPE, "text/plain")
142            .body(req)
143            .send()
144            .await?;
145
146        if resp.status().is_success() {
147            Ok(())
148        } else {
149            api_err!("Failed to update status: {}", resp.status())
150        }
151    }
152
153    pub async fn download_state(&self, process_id: ProcessId) -> Result<PathBuf, ApiError> {
154        let resp = get!(self, "/api/v1/process/{process_id}/state/snapshot")
155            .send()
156            .await?;
157
158        if !resp.status().is_success() {
159            return api_err!("Failed to download process state: {}", resp.status());
160        }
161
162        let path = self.new_state_file_path(process_id.to_string(), ".zip").await?;
163        let mut file = tokio::fs::File::create(&path)
164            .await
165            .map_err(|e| api_error!("Failed to create a temporary file: {}", e))?;
166
167        use futures::StreamExt;
168        use tokio::io::AsyncWriteExt;
169        let mut stream = resp.bytes_stream();
170        while let Some(chunk) = stream.next().await {
171            let chunk = chunk?;
172            file.write_all(&chunk)
173                .await
174                .map_err(|e| api_error!("Failed to write to a temporary file: {}", e))?;
175        }
176
177        Ok(path)
178    }
179
180    async fn new_state_file_path<PREFIX: AsRef<str>, SUFFIX: AsRef<str>>(
181        &self,
182        prefix: PREFIX,
183        suffix: SUFFIX,
184    ) -> Result<PathBuf, ApiError> {
185        let prefix = prefix.as_ref();
186        let suffix = suffix.as_ref();
187
188        let temp_dir = self
189            .config
190            .temp_dir
191            .clone()
192            .ok_or_else(|| api_error!("config.temp_dir is not set"))?;
193
194        let mut path = temp_dir.join(format!("{prefix}{suffix}"));
195        let mut n = 0;
196        loop {
197            let exists = path.try_exists().map_err(|e| api_error!("IO error: {}", e))?;
198            if !exists {
199                break;
200            } else {
201                path = temp_dir.join(format!("{prefix}-{n}{suffix}"));
202                n += 1;
203            }
204        }
205
206        Ok(path)
207    }
208
209    pub async fn create_log_segment(
210        &self,
211        process_id: ProcessId,
212        req: &LogSegmentRequest,
213    ) -> Result<LogSegmentId, ApiError> {
214        let resp = post_json!(self, req, "/api/v2/process/{process_id}/log/segment")
215            .send()
216            .await?;
217
218        if resp.status().is_success() {
219            let resp = resp.json::<LogSegmentOperationResponse>().await?;
220            Ok(resp.id)
221        } else {
222            api_err!("Failed to update status: {}", resp.status())
223        }
224    }
225
226    pub async fn update_log_segment(
227        &self,
228        process_id: ProcessId,
229        segment_id: LogSegmentId,
230        req: &LogSegmentUpdateRequest,
231    ) -> Result<(), ApiError> {
232        let resp = post_json!(self, req, "/api/v2/process/{process_id}/log/segment/{segment_id}")
233            .send()
234            .await?;
235
236        if resp.status().is_success() {
237            Ok(())
238        } else {
239            api_err!("Failed to update status: {}", resp.status())
240        }
241    }
242
243    pub async fn append_to_log_segment(
244        &self,
245        process_id: ProcessId,
246        segment_id: LogSegmentId,
247        body: bytes::Bytes,
248    ) -> Result<(), ApiError> {
249        let resp = post!(self, "/api/v2/process/{process_id}/log/segment/{segment_id}/data")
250            .header(header::CONTENT_TYPE, "application/octet-stream")
251            .body(body)
252            .send()
253            .await?;
254
255        if resp.status().is_success() {
256            Ok(())
257        } else {
258            api_err!("Failed to update status: {}", resp.status())
259        }
260    }
261}