Skip to main content

deepstore_client/
client.rs

1//! High-level DeepStore client implementation
2
3use crate::{
4    error::{DeepstoreError, Result},
5    streaming::{DraftDocumentBatch, StreamParams},
6    types::{QueryTokenProvider, SearchParams, SearchResult},
7};
8use chrono::{DateTime, Utc};
9use deepstore_agent_client::Client as AgentClient;
10use deepstore_server_client::Client as ServerClient;
11use serde::Deserialize;
12use serde_json::Value as JsonValue;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tracing::debug;
16
17/// Token cache entry
18#[derive(Debug, Clone)]
19struct TokenInfo {
20    token: String,
21    expires_at: Option<DateTime<Utc>>,
22}
23
24/// High-level DeepStore client that coordinates control plane and data plane calls
25pub struct DeepstoreClient {
26    /// Control plane URL (for SSE streaming)
27    control_plane_url: String,
28    /// Auth proxy URL for data plane requests (Agent Manager)
29    auth_proxy_url: String,
30    /// Token provider function
31    token_provider: Arc<QueryTokenProvider>,
32    /// Cached token
33    token_cache: Arc<RwLock<Option<TokenInfo>>>,
34    /// Request timeout
35    timeout: std::time::Duration,
36}
37
38impl DeepstoreClient {
39    /// Create a new client builder
40    pub fn builder() -> DeepstoreClientBuilder {
41        DeepstoreClientBuilder::default()
42    }
43
44    /// Get a valid token (from cache or provider)
45    async fn get_token(&self) -> Result<String> {
46        // Check cache first
47        {
48            let cache = self.token_cache.read().await;
49            if let Some(token_info) = cache.as_ref() {
50                // Check if token is still valid (with 30s buffer)
51                if let Some(expires_at) = token_info.expires_at {
52                    if Utc::now() < expires_at - chrono::Duration::seconds(30) {
53                        return Ok(token_info.token.clone());
54                    }
55                } else {
56                    // No expiration - token is always valid
57                    return Ok(token_info.token.clone());
58                }
59            }
60        }
61
62        // Get fresh token from provider
63        let token = (self.token_provider)().await?;
64
65        // Try to decode JWT to get expiration
66        let expires_at = decode_jwt_expiration(&token);
67
68        // Update cache
69        {
70            let mut cache = self.token_cache.write().await;
71            *cache = Some(TokenInfo {
72                token: token.clone(),
73                expires_at,
74            });
75        }
76
77        Ok(token)
78    }
79
80
81    /// Search for documents
82    ///
83    /// Fetches splits from control plane, then searches via auth proxy.
84    ///
85    /// For dev/testing environments where splits aren't tracked, the data plane
86    /// can search an in-memory buffer by passing an empty splits array.
87    pub async fn search(&self, params: SearchParams) -> Result<SearchResult> {
88        let token = self.get_token().await?;
89
90        debug!(
91            database_id = %params.database_id,
92            query = %params.query,
93            "Fetching splits from control plane"
94        );
95
96        // 1. Fetch splits from control plane
97        let splits = self
98            .fetch_splits(
99                &params.database_id,
100                &params.start_time,
101                &params.end_time,
102                &token,
103            )
104            .await?;
105
106        debug!(
107            split_count = splits.len(),
108            "Fetched splits (empty = in-memory search only)"
109        );
110
111        // 2. Build search request for data plane
112        // Note: Empty splits array signals to dev server to search in-memory buffer
113        let search_request = deepstore_agent_client::types::SearchRequest {
114            query: params.query.clone(),
115            splits: splits.into_iter().map(|s| s.into()).collect(),
116            max_hits: params.max_hits.map(|m| m as u64),
117            start_timestamp: Some(params.start_time.to_rfc3339()),
118            end_timestamp: Some(params.end_time.to_rfc3339()),
119            sort_by: params.sort_by.clone(),
120            search_after: params.search_after.unwrap_or_default(),
121            snippet_fields: params.snippet_fields.unwrap_or_default(),
122            search_fields: params.search_fields.unwrap_or_default(),
123            aggs: params.aggregations,
124            start_offset: params.start_offset.map(|o| o as u64),
125        };
126
127        debug!(
128            query = %params.query,
129            splits_count = search_request.splits.len(),
130            "Searching via auth proxy"
131        );
132
133        // 3. Search via auth proxy (Agent Manager → DeepStore reader)
134        let agent_client = AgentClient::new_authenticated(&self.auth_proxy_url, &token, self.timeout);
135
136        let search_response = agent_client
137            .post_search(None, &search_request)
138            .await
139            .map_err(|e| DeepstoreError::Network(format!("Search request failed: {}", e)))?
140            .into_inner();
141
142        debug!(
143            num_hits = search_response.num_hits,
144            hits_count = search_response.hits.len(),
145            "Search completed"
146        );
147
148        // Convert to our public SearchResult type
149        Ok(SearchResult {
150            num_hits: search_response.num_hits as usize,
151            hits: search_response.hits,
152            snippets: if search_response.snippets.is_empty() {
153                None
154            } else {
155                Some(search_response.snippets)
156            },
157            elapsed_time_micros: search_response.elapsed_time_micros as u64,
158            errors: search_response.errors,
159            aggregations: search_response.aggregations,
160        })
161    }
162
163    /// Fetch documents from draft files
164    ///
165    /// Used for streaming - fetches raw documents before indexing.
166    pub async fn get_draft_documents(
167        &self,
168        database_id: &str,
169        draft_ids: Vec<String>,
170    ) -> Result<DraftDocumentBatch> {
171        let token = self.get_token().await?;
172
173        debug!(
174            database_id = %database_id,
175            draft_count = draft_ids.len(),
176            "Fetching draft documents"
177        );
178
179        let request = deepstore_agent_client::types::GetDraftDocumentsRequest {
180            draft_ids: draft_ids.clone(),
181        };
182
183        let agent_client = AgentClient::new_authenticated(&self.auth_proxy_url, &token, self.timeout);
184
185        let response = agent_client
186            .post_drafts_documents(None, &request)
187            .await
188            .map_err(|e| DeepstoreError::Network(format!("Draft documents request failed: {}", e)))?
189            .into_inner();
190
191        debug!(
192            document_count = response.documents.len(),
193            "Fetched draft documents"
194        );
195
196        Ok(DraftDocumentBatch {
197            documents: response.documents,
198            draft_ids: Some(draft_ids),
199        })
200    }
201
202    /// Build SSE URL for streaming
203    ///
204    /// Returns the URL with token as query parameter (EventSource can't send headers).
205    pub fn build_sse_url(&self, params: &StreamParams, token: &str) -> String {
206        let mut url = format!(
207            "{}/v1/databases/{}/drafts-stream",
208            self.control_plane_url.trim_end_matches('/'),
209            urlencoding::encode(&params.database_id)
210        );
211
212        url.push_str(&format!("?token={}", urlencoding::encode(token)));
213
214        if let Some(start) = params.start_time {
215            url.push_str(&format!("&startTimestamp={}", start.to_rfc3339()));
216        }
217
218        url
219    }
220
221    /// Fetch splits from control plane
222    async fn fetch_splits(
223        &self,
224        database_id: &str,
225        start_time: &DateTime<Utc>,
226        end_time: &DateTime<Utc>,
227        token: &str,
228    ) -> Result<Vec<SplitMetadata>> {
229        let start_timestamp = start_time.timestamp();
230        let end_timestamp = end_time.timestamp();
231
232        let server_client = ServerClient::new_authenticated(&self.control_plane_url, token, self.timeout);
233
234        let response = server_client
235            .list_splits()
236            .database_id(database_id)
237            .start_timestamp(&start_timestamp.to_string())
238            .end_timestamp(&end_timestamp.to_string())
239            .send()
240            .await
241            .map_err(|e| DeepstoreError::Network(format!("Failed to fetch splits: {}", e)))?;
242
243        Ok(response
244            .into_inner()
245            .splits
246            .into_iter()
247            .map(SplitMetadata::from)
248            .collect())
249    }
250}
251
252/// Split metadata from control plane
253#[derive(Debug, Clone)]
254struct SplitMetadata {
255    split_id: String,
256    time_range_start: i64,
257    time_range_end: i64,
258    footer_offsets_start: i64,
259    footer_offsets_end: i64,
260}
261
262impl From<deepstore_server_client::types::ListSplitsResponseSplitsItem> for SplitMetadata {
263    fn from(s: deepstore_server_client::types::ListSplitsResponseSplitsItem) -> Self {
264        Self {
265            split_id: s.split_id,
266            time_range_start: s.time_range_start as i64,
267            time_range_end: s.time_range_end as i64,
268            footer_offsets_start: s
269                .footer_offsets
270                .as_ref()
271                .map(|f| f.start as i64)
272                .unwrap_or(0),
273            footer_offsets_end: s.footer_offsets.as_ref().map(|f| f.end as i64).unwrap_or(0),
274        }
275    }
276}
277
278impl From<SplitMetadata> for deepstore_agent_client::types::SplitMetadata {
279    fn from(s: SplitMetadata) -> Self {
280        Self {
281            split_id: s.split_id.clone(),
282            database_id: "".to_string(), // Not used by reader
283            time_range_start: Some(s.time_range_start),
284            time_range_end: Some(s.time_range_end),
285            num_docs: 0, // Not needed for search
286            footer_offsets_start: s.footer_offsets_start as u64,
287            footer_offsets_end: s.footer_offsets_end as u64,
288            full_metadata: serde_json::Value::Object(serde_json::Map::new()),
289        }
290    }
291}
292
293/// Decode JWT expiration claim
294fn decode_jwt_expiration(token: &str) -> Option<DateTime<Utc>> {
295    let parts: Vec<&str> = token.split('.').collect();
296    if parts.len() != 3 {
297        return None;
298    }
299
300    // Decode base64url payload
301    let payload_b64 = parts[1];
302    let decoded = base64_decode_url_safe(payload_b64).ok()?;
303    let json: JsonValue = serde_json::from_slice(&decoded).ok()?;
304
305    // Get exp claim
306    let exp = json.get("exp")?.as_i64()?;
307    DateTime::from_timestamp(exp, 0)
308}
309
310fn base64_decode_url_safe(input: &str) -> std::result::Result<Vec<u8>, String> {
311    use base64::Engine;
312    base64::engine::general_purpose::URL_SAFE_NO_PAD
313        .decode(input)
314        .map_err(|e| e.to_string())
315}
316
317/// Builder for DeepStore client
318#[derive(Default)]
319pub struct DeepstoreClientBuilder {
320    control_plane_url: Option<String>,
321    auth_proxy_url: Option<String>,
322    token_provider: Option<Arc<QueryTokenProvider>>,
323    timeout: Option<std::time::Duration>,
324}
325
326impl DeepstoreClientBuilder {
327    /// Set control plane URL (DeepStore server for split metadata)
328    pub fn control_plane_url(mut self, url: impl Into<String>) -> Self {
329        self.control_plane_url = Some(url.into());
330        self
331    }
332
333    /// Set auth proxy URL (Agent Manager for search/draft requests)
334    pub fn auth_proxy_url(mut self, url: impl Into<String>) -> Self {
335        self.auth_proxy_url = Some(url.into());
336        self
337    }
338
339    /// Set query token provider function
340    pub fn query_token_provider<F, Fut>(mut self, provider: F) -> Self
341    where
342        F: Fn() -> Fut + Send + Sync + 'static,
343        Fut: std::future::Future<Output = Result<String>> + Send + 'static,
344    {
345        self.token_provider = Some(Arc::new(Box::new(move || Box::pin(provider()))));
346        self
347    }
348
349    /// Set request timeout (default: 30s)
350    pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
351        self.timeout = Some(timeout);
352        self
353    }
354
355    /// Build the client
356    pub fn build(self) -> Result<DeepstoreClient> {
357        let control_plane_url = self.control_plane_url.ok_or_else(|| {
358            DeepstoreError::Configuration("control_plane_url is required".to_string())
359        })?;
360
361        let auth_proxy_url = self.auth_proxy_url.ok_or_else(|| {
362            DeepstoreError::Configuration("auth_proxy_url is required".to_string())
363        })?;
364
365        let token_provider = self.token_provider.ok_or_else(|| {
366            DeepstoreError::Configuration("token_provider is required".to_string())
367        })?;
368
369        let timeout = self.timeout.unwrap_or(std::time::Duration::from_secs(30));
370
371        // Normalize control plane URL - ensures /v1 suffix
372        let control_plane_base = normalize_control_plane_url(&control_plane_url);
373        // Data plane URL is used as-is (caller specifies complete base URL)
374        let auth_proxy_base = auth_proxy_url.trim_end_matches('/').to_string();
375
376        Ok(DeepstoreClient {
377            control_plane_url: control_plane_base,
378            auth_proxy_url: auth_proxy_base,
379            token_provider,
380            token_cache: Arc::new(RwLock::new(None)),
381            timeout,
382        })
383    }
384}
385
386/// Normalize control plane URL - ensures /v1 suffix
387fn normalize_control_plane_url(url: &str) -> String {
388    let trimmed = url.trim_end_matches('/');
389    if trimmed.ends_with("/v1") {
390        trimmed.to_string()
391    } else {
392        format!("{}/v1", trimmed)
393    }
394}