1use std::borrow::Cow;
2use std::path::PathBuf;
3
4use http::{
5 HeaderValue,
6 header::{self, HeaderMap},
7};
8use reqwest::multipart;
9use url::Url;
10
11use crate::model::{ApiToken, ProcessEntry, StartProcessResponse};
12use crate::{
13 api_err, api_error,
14 error::ApiError,
15 model::{
16 AgentId, LogSegmentId, LogSegmentOperationResponse, LogSegmentRequest, LogSegmentUpdateRequest,
17 ProcessId, ProcessStatus, SessionToken, USER_AGENT_VALUE,
18 },
19};
20
21macro_rules! get {
22 ($client:expr, $url_fmt:expr $(, $args:expr)*) => {
23 $client.parent.client
24 .get($client.config.base_url.join(&format!($url_fmt, $($args),*)).expect("valid url"))
25 };
26}
27
28macro_rules! post {
29 ($client:expr, $url_fmt:expr $(, $args:expr)*) => {
30 $client
31 .parent
32 .client
33 .post($client.config.base_url.join(&format!($url_fmt, $($args),*)).expect("valid url"))
34 };
35}
36
37macro_rules! post_json {
38 ($client:expr, $body:expr, $url_fmt:expr $(, $args:expr)*) => {
39 post!($client, $url_fmt $(, $args),*)
40 .header(header::CONTENT_TYPE, "application/json")
41 .json($body)
42 };
43}
44
45pub struct Config {
46 pub base_url: Url,
47 pub session_token: Option<SessionToken>,
48 pub api_token: Option<ApiToken>,
49 pub temp_dir: Option<PathBuf>,
50}
51
52pub struct ApiClient {
53 config: Config,
54 client: reqwest::Client,
55}
56
57impl ApiClient {
58 pub fn new(config: Config) -> Result<Self, ApiError> {
59 let mut default_headers = HeaderMap::new();
60 default_headers.insert(header::USER_AGENT, HeaderValue::from_static(USER_AGENT_VALUE));
61
62 if let Some(session_token) = &config.session_token {
63 default_headers.insert(
64 "X-Concord-SessionToken",
65 HeaderValue::try_from(session_token).map_err(|e| api_error!("Invalid session_token: {e}"))?,
66 );
67 } else if let Some(api_token) = &config.api_token {
68 default_headers.insert(
69 "Authorization",
70 HeaderValue::try_from(api_token).map_err(|e| api_error!("Invalid api_token: {e}"))?,
71 );
72 }
73
74 let client = reqwest::ClientBuilder::new()
75 .default_headers(default_headers)
76 .build()?;
77
78 Ok(ApiClient { config, client })
79 }
80
81 pub fn process_api(&self) -> ProcessApiClient {
82 ProcessApiClient {
83 config: &self.config,
84 parent: self,
85 }
86 }
87}
88
89pub struct ProcessApiClient<'a> {
90 config: &'a Config,
91 parent: &'a ApiClient,
92}
93
94impl std::fmt::Debug for ProcessApiClient<'_> {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("ProcessApiClient")
97 .field("config.base_url", &self.config.base_url)
98 .finish()
99 }
100}
101
102impl ProcessApiClient<'_> {
103 pub async fn start_process<I, T>(&self, input: I) -> Result<StartProcessResponse, ApiError>
104 where
105 I: IntoIterator<Item = (T, multipart::Part)>,
106 T: Into<Cow<'static, str>>,
107 {
108 let mut form = multipart::Form::new();
109 for (k, v) in input.into_iter() {
110 form = form.part(k, v);
111 }
112
113 let resp = post!(self, "/api/v1/process").multipart(form).send().await?;
114
115 if resp.status().is_success() {
116 Ok(resp.json().await?)
117 } else {
118 api_err!("Failed to start a process: {}", resp.status())
119 }
120 }
121
122 pub async fn get_process(&self, process_id: &ProcessId) -> Result<ProcessEntry, ApiError> {
123 let resp = get!(self, "/api/v1/process/{process_id}").send().await?;
124
125 if resp.status().is_success() {
126 Ok(resp.json().await?)
127 } else {
128 api_err!("Failed to get process: {}", resp.status())
129 }
130 }
131
132 pub async fn update_status(
133 &self,
134 process_id: ProcessId,
135 agent_id: AgentId,
136 status: ProcessStatus,
137 ) -> Result<(), ApiError> {
138 let req = format!("{status}");
139 let resp = post!(self, "/api/v1/process/{process_id}/status")
140 .query(&[("agent_id", agent_id)])
141 .header(header::CONTENT_TYPE, "text/plain")
142 .body(req)
143 .send()
144 .await?;
145
146 if resp.status().is_success() {
147 Ok(())
148 } else {
149 api_err!("Failed to update status: {}", resp.status())
150 }
151 }
152
153 pub async fn download_state(&self, process_id: ProcessId) -> Result<PathBuf, ApiError> {
154 let resp = get!(self, "/api/v1/process/{process_id}/state/snapshot")
155 .send()
156 .await?;
157
158 if !resp.status().is_success() {
159 return api_err!("Failed to download process state: {}", resp.status());
160 }
161
162 let path = self.new_state_file_path(process_id.to_string(), ".zip").await?;
163 let mut file = tokio::fs::File::create(&path)
164 .await
165 .map_err(|e| api_error!("Failed to create a temporary file: {}", e))?;
166
167 use futures::StreamExt;
168 use tokio::io::AsyncWriteExt;
169 let mut stream = resp.bytes_stream();
170 while let Some(chunk) = stream.next().await {
171 let chunk = chunk?;
172 file.write_all(&chunk)
173 .await
174 .map_err(|e| api_error!("Failed to write to a temporary file: {}", e))?;
175 }
176
177 Ok(path)
178 }
179
180 async fn new_state_file_path<PREFIX: AsRef<str>, SUFFIX: AsRef<str>>(
181 &self,
182 prefix: PREFIX,
183 suffix: SUFFIX,
184 ) -> Result<PathBuf, ApiError> {
185 let prefix = prefix.as_ref();
186 let suffix = suffix.as_ref();
187
188 let temp_dir = self
189 .config
190 .temp_dir
191 .clone()
192 .ok_or_else(|| api_error!("config.temp_dir is not set"))?;
193
194 let mut path = temp_dir.join(format!("{prefix}{suffix}"));
195 let mut n = 0;
196 loop {
197 let exists = path.try_exists().map_err(|e| api_error!("IO error: {}", e))?;
198 if !exists {
199 break;
200 } else {
201 path = temp_dir.join(format!("{prefix}-{n}{suffix}"));
202 n += 1;
203 }
204 }
205
206 Ok(path)
207 }
208
209 pub async fn create_log_segment(
210 &self,
211 process_id: ProcessId,
212 req: &LogSegmentRequest,
213 ) -> Result<LogSegmentId, ApiError> {
214 let resp = post_json!(self, req, "/api/v2/process/{process_id}/log/segment")
215 .send()
216 .await?;
217
218 if resp.status().is_success() {
219 let resp = resp.json::<LogSegmentOperationResponse>().await?;
220 Ok(resp.id)
221 } else {
222 api_err!("Failed to update status: {}", resp.status())
223 }
224 }
225
226 pub async fn update_log_segment(
227 &self,
228 process_id: ProcessId,
229 segment_id: LogSegmentId,
230 req: &LogSegmentUpdateRequest,
231 ) -> Result<(), ApiError> {
232 let resp = post_json!(self, req, "/api/v2/process/{process_id}/log/segment/{segment_id}")
233 .send()
234 .await?;
235
236 if resp.status().is_success() {
237 Ok(())
238 } else {
239 api_err!("Failed to update status: {}", resp.status())
240 }
241 }
242
243 pub async fn append_to_log_segment(
244 &self,
245 process_id: ProcessId,
246 segment_id: LogSegmentId,
247 body: bytes::Bytes,
248 ) -> Result<(), ApiError> {
249 let resp = post!(self, "/api/v2/process/{process_id}/log/segment/{segment_id}/data")
250 .header(header::CONTENT_TYPE, "application/octet-stream")
251 .body(body)
252 .send()
253 .await?;
254
255 if resp.status().is_success() {
256 Ok(())
257 } else {
258 api_err!("Failed to update status: {}", resp.status())
259 }
260 }
261}