1use crate::{
4 error::{DeepstoreError, Result},
5 streaming::{DraftDocumentBatch, StreamParams},
6 types::{QueryTokenProvider, SearchParams, SearchResult},
7};
8use chrono::{DateTime, Utc};
9use deepstore_agent_client::Client as AgentClient;
10use deepstore_server_client::Client as ServerClient;
11use serde::Deserialize;
12use serde_json::Value as JsonValue;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tracing::debug;
16
17#[derive(Debug, Clone)]
19struct TokenInfo {
20 token: String,
21 expires_at: Option<DateTime<Utc>>,
22}
23
24pub struct DeepstoreClient {
26 control_plane_url: String,
28 auth_proxy_url: String,
30 token_provider: Arc<QueryTokenProvider>,
32 token_cache: Arc<RwLock<Option<TokenInfo>>>,
34 timeout: std::time::Duration,
36}
37
38impl DeepstoreClient {
39 pub fn builder() -> DeepstoreClientBuilder {
41 DeepstoreClientBuilder::default()
42 }
43
44 async fn get_token(&self) -> Result<String> {
46 {
48 let cache = self.token_cache.read().await;
49 if let Some(token_info) = cache.as_ref() {
50 if let Some(expires_at) = token_info.expires_at {
52 if Utc::now() < expires_at - chrono::Duration::seconds(30) {
53 return Ok(token_info.token.clone());
54 }
55 } else {
56 return Ok(token_info.token.clone());
58 }
59 }
60 }
61
62 let token = (self.token_provider)().await?;
64
65 let expires_at = decode_jwt_expiration(&token);
67
68 {
70 let mut cache = self.token_cache.write().await;
71 *cache = Some(TokenInfo {
72 token: token.clone(),
73 expires_at,
74 });
75 }
76
77 Ok(token)
78 }
79
80
81 pub async fn search(&self, params: SearchParams) -> Result<SearchResult> {
88 let token = self.get_token().await?;
89
90 debug!(
91 database_id = %params.database_id,
92 query = %params.query,
93 "Fetching splits from control plane"
94 );
95
96 let splits = self
98 .fetch_splits(
99 ¶ms.database_id,
100 ¶ms.start_time,
101 ¶ms.end_time,
102 &token,
103 )
104 .await?;
105
106 debug!(
107 split_count = splits.len(),
108 "Fetched splits (empty = in-memory search only)"
109 );
110
111 let search_request = deepstore_agent_client::types::SearchRequest {
114 query: params.query.clone(),
115 splits: splits.into_iter().map(|s| s.into()).collect(),
116 max_hits: params.max_hits.map(|m| m as u64),
117 start_timestamp: Some(params.start_time.to_rfc3339()),
118 end_timestamp: Some(params.end_time.to_rfc3339()),
119 sort_by: params.sort_by.clone(),
120 search_after: params.search_after.unwrap_or_default(),
121 snippet_fields: params.snippet_fields.unwrap_or_default(),
122 search_fields: params.search_fields.unwrap_or_default(),
123 aggs: params.aggregations,
124 start_offset: params.start_offset.map(|o| o as u64),
125 };
126
127 debug!(
128 query = %params.query,
129 splits_count = search_request.splits.len(),
130 "Searching via auth proxy"
131 );
132
133 let agent_client = AgentClient::new_authenticated(&self.auth_proxy_url, &token, self.timeout);
135
136 let search_response = agent_client
137 .post_search(None, &search_request)
138 .await
139 .map_err(|e| DeepstoreError::Network(format!("Search request failed: {}", e)))?
140 .into_inner();
141
142 debug!(
143 num_hits = search_response.num_hits,
144 hits_count = search_response.hits.len(),
145 "Search completed"
146 );
147
148 Ok(SearchResult {
150 num_hits: search_response.num_hits as usize,
151 hits: search_response.hits,
152 snippets: if search_response.snippets.is_empty() {
153 None
154 } else {
155 Some(search_response.snippets)
156 },
157 elapsed_time_micros: search_response.elapsed_time_micros as u64,
158 errors: search_response.errors,
159 aggregations: search_response.aggregations,
160 })
161 }
162
163 pub async fn get_draft_documents(
167 &self,
168 database_id: &str,
169 draft_ids: Vec<String>,
170 ) -> Result<DraftDocumentBatch> {
171 let token = self.get_token().await?;
172
173 debug!(
174 database_id = %database_id,
175 draft_count = draft_ids.len(),
176 "Fetching draft documents"
177 );
178
179 let request = deepstore_agent_client::types::GetDraftDocumentsRequest {
180 draft_ids: draft_ids.clone(),
181 };
182
183 let agent_client = AgentClient::new_authenticated(&self.auth_proxy_url, &token, self.timeout);
184
185 let response = agent_client
186 .post_drafts_documents(None, &request)
187 .await
188 .map_err(|e| DeepstoreError::Network(format!("Draft documents request failed: {}", e)))?
189 .into_inner();
190
191 debug!(
192 document_count = response.documents.len(),
193 "Fetched draft documents"
194 );
195
196 Ok(DraftDocumentBatch {
197 documents: response.documents,
198 draft_ids: Some(draft_ids),
199 })
200 }
201
202 pub fn build_sse_url(&self, params: &StreamParams, token: &str) -> String {
206 let mut url = format!(
207 "{}/v1/databases/{}/drafts-stream",
208 self.control_plane_url.trim_end_matches('/'),
209 urlencoding::encode(¶ms.database_id)
210 );
211
212 url.push_str(&format!("?token={}", urlencoding::encode(token)));
213
214 if let Some(start) = params.start_time {
215 url.push_str(&format!("&startTimestamp={}", start.to_rfc3339()));
216 }
217
218 url
219 }
220
221 async fn fetch_splits(
223 &self,
224 database_id: &str,
225 start_time: &DateTime<Utc>,
226 end_time: &DateTime<Utc>,
227 token: &str,
228 ) -> Result<Vec<SplitMetadata>> {
229 let start_timestamp = start_time.timestamp();
230 let end_timestamp = end_time.timestamp();
231
232 let server_client = ServerClient::new_authenticated(&self.control_plane_url, token, self.timeout);
233
234 let response = server_client
235 .list_splits()
236 .database_id(database_id)
237 .start_timestamp(&start_timestamp.to_string())
238 .end_timestamp(&end_timestamp.to_string())
239 .send()
240 .await
241 .map_err(|e| DeepstoreError::Network(format!("Failed to fetch splits: {}", e)))?;
242
243 Ok(response
244 .into_inner()
245 .splits
246 .into_iter()
247 .map(SplitMetadata::from)
248 .collect())
249 }
250}
251
252#[derive(Debug, Clone)]
254struct SplitMetadata {
255 split_id: String,
256 time_range_start: i64,
257 time_range_end: i64,
258 footer_offsets_start: i64,
259 footer_offsets_end: i64,
260}
261
262impl From<deepstore_server_client::types::ListSplitsResponseSplitsItem> for SplitMetadata {
263 fn from(s: deepstore_server_client::types::ListSplitsResponseSplitsItem) -> Self {
264 Self {
265 split_id: s.split_id,
266 time_range_start: s.time_range_start as i64,
267 time_range_end: s.time_range_end as i64,
268 footer_offsets_start: s
269 .footer_offsets
270 .as_ref()
271 .map(|f| f.start as i64)
272 .unwrap_or(0),
273 footer_offsets_end: s.footer_offsets.as_ref().map(|f| f.end as i64).unwrap_or(0),
274 }
275 }
276}
277
278impl From<SplitMetadata> for deepstore_agent_client::types::SplitMetadata {
279 fn from(s: SplitMetadata) -> Self {
280 Self {
281 split_id: s.split_id.clone(),
282 database_id: "".to_string(), time_range_start: Some(s.time_range_start),
284 time_range_end: Some(s.time_range_end),
285 num_docs: 0, footer_offsets_start: s.footer_offsets_start as u64,
287 footer_offsets_end: s.footer_offsets_end as u64,
288 full_metadata: serde_json::Value::Object(serde_json::Map::new()),
289 }
290 }
291}
292
293fn decode_jwt_expiration(token: &str) -> Option<DateTime<Utc>> {
295 let parts: Vec<&str> = token.split('.').collect();
296 if parts.len() != 3 {
297 return None;
298 }
299
300 let payload_b64 = parts[1];
302 let decoded = base64_decode_url_safe(payload_b64).ok()?;
303 let json: JsonValue = serde_json::from_slice(&decoded).ok()?;
304
305 let exp = json.get("exp")?.as_i64()?;
307 DateTime::from_timestamp(exp, 0)
308}
309
310fn base64_decode_url_safe(input: &str) -> std::result::Result<Vec<u8>, String> {
311 use base64::Engine;
312 base64::engine::general_purpose::URL_SAFE_NO_PAD
313 .decode(input)
314 .map_err(|e| e.to_string())
315}
316
317#[derive(Default)]
319pub struct DeepstoreClientBuilder {
320 control_plane_url: Option<String>,
321 auth_proxy_url: Option<String>,
322 token_provider: Option<Arc<QueryTokenProvider>>,
323 timeout: Option<std::time::Duration>,
324}
325
326impl DeepstoreClientBuilder {
327 pub fn control_plane_url(mut self, url: impl Into<String>) -> Self {
329 self.control_plane_url = Some(url.into());
330 self
331 }
332
333 pub fn auth_proxy_url(mut self, url: impl Into<String>) -> Self {
335 self.auth_proxy_url = Some(url.into());
336 self
337 }
338
339 pub fn query_token_provider<F, Fut>(mut self, provider: F) -> Self
341 where
342 F: Fn() -> Fut + Send + Sync + 'static,
343 Fut: std::future::Future<Output = Result<String>> + Send + 'static,
344 {
345 self.token_provider = Some(Arc::new(Box::new(move || Box::pin(provider()))));
346 self
347 }
348
349 pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
351 self.timeout = Some(timeout);
352 self
353 }
354
355 pub fn build(self) -> Result<DeepstoreClient> {
357 let control_plane_url = self.control_plane_url.ok_or_else(|| {
358 DeepstoreError::Configuration("control_plane_url is required".to_string())
359 })?;
360
361 let auth_proxy_url = self.auth_proxy_url.ok_or_else(|| {
362 DeepstoreError::Configuration("auth_proxy_url is required".to_string())
363 })?;
364
365 let token_provider = self.token_provider.ok_or_else(|| {
366 DeepstoreError::Configuration("token_provider is required".to_string())
367 })?;
368
369 let timeout = self.timeout.unwrap_or(std::time::Duration::from_secs(30));
370
371 let control_plane_base = normalize_control_plane_url(&control_plane_url);
373 let auth_proxy_base = auth_proxy_url.trim_end_matches('/').to_string();
375
376 Ok(DeepstoreClient {
377 control_plane_url: control_plane_base,
378 auth_proxy_url: auth_proxy_base,
379 token_provider,
380 token_cache: Arc::new(RwLock::new(None)),
381 timeout,
382 })
383 }
384}
385
386fn normalize_control_plane_url(url: &str) -> String {
388 let trimmed = url.trim_end_matches('/');
389 if trimmed.ends_with("/v1") {
390 trimmed.to_string()
391 } else {
392 format!("{}/v1", trimmed)
393 }
394}