Skip to main content

duroxide_cdb/
query.rs

1use crate::client::{CosmosDBClient, QueryParameter};
2use crate::models::*;
3use duroxide::providers::ProviderError;
4use duroxide::TagFilter;
5
6/// Query for visible, unlocked orchestrator queue items in specific dispatch slots.
7pub async fn find_candidate_orch_item(
8    client: &CosmosDBClient,
9    now_ms: u64,
10    my_slots: &[u8],
11    min_version_packed: Option<i64>,
12    max_version_packed: Option<i64>,
13    excluded_instances: &[String],
14) -> Result<Option<QueueItemDocument>, ProviderError> {
15    // If no slots assigned, skip query entirely
16    if my_slots.is_empty() {
17        return Ok(None);
18    }
19
20    let mut sql = format!(
21        "SELECT * FROM c \
22         WHERE c.type = '{}' \
23         AND c.visibleAt <= @now \
24         AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
25        DOC_TYPE_ORCH_QUEUE
26    );
27
28    // Only add dispatchSlot filter when NOT all 256 slots are assigned
29    if my_slots.len() < 256 {
30        let slot_list = my_slots
31            .iter()
32            .map(|s| s.to_string())
33            .collect::<Vec<_>>()
34            .join(",");
35        sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
36    }
37
38    let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
39
40    // Capability filter
41    if let (Some(min_v), Some(max_v)) = (min_version_packed, max_version_packed) {
42        sql.push_str(
43            " AND (NOT IS_DEFINED(c.pinnedDuroxideVersionPacked) \
44             OR c.pinnedDuroxideVersionPacked = null \
45             OR (c.pinnedDuroxideVersionPacked >= @minVersion \
46                 AND c.pinnedDuroxideVersionPacked <= @maxVersion))",
47        );
48        params.push(QueryParameter::new("@minVersion", serde_json::json!(min_v)));
49        params.push(QueryParameter::new("@maxVersion", serde_json::json!(max_v)));
50    }
51
52    // Exclude instances we already failed to lock
53    for (i, instance) in excluded_instances.iter().enumerate() {
54        let param_name = format!("@excl{i}");
55        sql.push_str(&format!(" AND c.instanceId != {param_name}"));
56        params.push(QueryParameter::new(param_name, serde_json::json!(instance)));
57    }
58
59    // No ORDER BY — cross-partition queries don't support it via gateway.
60    // Sort client-side and pick the earliest.
61    let results = client.query(&sql, params, None).await?;
62
63    let mut items: Vec<QueueItemDocument> = results
64        .into_iter()
65        .map(serde_json::from_value)
66        .collect::<Result<Vec<_>, _>>()
67        .map_err(|e| {
68            ProviderError::permanent(
69                "find_candidate_orch_item",
70                format!("Failed to deserialize queue item: {e}"),
71            )
72        })?;
73
74    // Sort by enqueuedAt ascending, pick earliest
75    items.sort_by_key(|i| i.enqueued_at);
76    Ok(items.into_iter().next())
77}
78
79/// Query for visible, unlocked worker queue items in specific dispatch slots.
80pub async fn find_candidate_work_item(
81    client: &CosmosDBClient,
82    now_ms: u64,
83    my_slots: &[u8],
84    session_owner_id: Option<&str>,
85    excluded_items: &[String],
86    tag_filter: &TagFilter,
87) -> Result<Option<QueueItemDocument>, ProviderError> {
88    // If no slots assigned, skip query entirely
89    if my_slots.is_empty() {
90        return Ok(None);
91    }
92
93    let mut sql = format!(
94        "SELECT * FROM c \
95         WHERE c.type = '{}' \
96         AND c.visibleAt <= @now \
97         AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
98        DOC_TYPE_WORKER_QUEUE
99    );
100
101    // Only add dispatchSlot filter when NOT all 256 slots are assigned
102    if my_slots.len() < 256 {
103        let slot_list = my_slots
104            .iter()
105            .map(|s| s.to_string())
106            .collect::<Vec<_>>()
107            .join(",");
108        sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
109    }
110
111    let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
112
113    // If no session config, skip session items
114    if session_owner_id.is_none() {
115        sql.push_str(" AND (NOT IS_DEFINED(c.sessionId) OR c.sessionId = null)");
116    }
117
118    // Tag filter clause
119    match tag_filter {
120        TagFilter::DefaultOnly => {
121            sql.push_str(" AND (NOT IS_DEFINED(c.tag) OR c.tag = null)");
122        }
123        TagFilter::Tags(set) => {
124            let mut tags: Vec<String> = set.iter().cloned().collect();
125            tags.sort();
126            let tag_list: Vec<String> = tags
127                .iter()
128                .enumerate()
129                .map(|(i, tag)| {
130                    let param_name = format!("@tag{i}");
131                    params.push(QueryParameter::new(
132                        param_name.clone(),
133                        serde_json::json!(tag),
134                    ));
135                    param_name
136                })
137                .collect();
138            sql.push_str(&format!(" AND c.tag IN ({})", tag_list.join(",")));
139        }
140        TagFilter::DefaultAnd(set) => {
141            let mut tags: Vec<String> = set.iter().cloned().collect();
142            tags.sort();
143            let tag_list: Vec<String> = tags
144                .iter()
145                .enumerate()
146                .map(|(i, tag)| {
147                    let param_name = format!("@tag{i}");
148                    params.push(QueryParameter::new(
149                        param_name.clone(),
150                        serde_json::json!(tag),
151                    ));
152                    param_name
153                })
154                .collect();
155            sql.push_str(&format!(
156                " AND (NOT IS_DEFINED(c.tag) OR c.tag = null OR c.tag IN ({}))",
157                tag_list.join(",")
158            ));
159        }
160        TagFilter::Any => {
161            // No additional filter — fetch everything
162        }
163        TagFilter::None => {
164            // Should not reach here (caller returns early), but be safe
165            sql.push_str(" AND false");
166        }
167    }
168
169    // Exclude items we already failed to lock
170    for (i, item_id) in excluded_items.iter().enumerate() {
171        let param_name = format!("@excl{i}");
172        sql.push_str(&format!(" AND c.id != {param_name}"));
173        params.push(QueryParameter::new(param_name, serde_json::json!(item_id)));
174    }
175
176    // No ORDER BY — cross-partition queries don't support it via gateway.
177    // Sort client-side and pick the earliest.
178    let results = client.query(&sql, params, None).await?;
179
180    let mut items: Vec<QueueItemDocument> = results
181        .into_iter()
182        .map(serde_json::from_value)
183        .collect::<Result<Vec<_>, _>>()
184        .map_err(|e| {
185            ProviderError::permanent(
186                "find_candidate_work_item",
187                format!("Failed to deserialize work item: {e}"),
188            )
189        })?;
190
191    // Sort by enqueuedAt ascending, pick earliest
192    items.sort_by_key(|i| i.enqueued_at);
193    Ok(items.into_iter().next())
194}
195
196/// Collect all pending messages for an instance from the orchestrator queue.
197pub async fn collect_orch_messages(
198    client: &CosmosDBClient,
199    instance_id: &str,
200    now_ms: u64,
201) -> Result<Vec<QueueItemDocument>, ProviderError> {
202    let sql = format!(
203        "SELECT * FROM c \
204         WHERE c.instanceId = @instanceId \
205         AND c.type = '{}' \
206         AND c.visibleAt <= @now \
207         AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now) \
208         ORDER BY c.enqueuedAt",
209        DOC_TYPE_ORCH_QUEUE
210    );
211
212    let params = vec![
213        QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
214        QueryParameter::new("@now", serde_json::json!(now_ms)),
215    ];
216
217    let results = client.query(&sql, params, Some(instance_id)).await?;
218
219    results
220        .into_iter()
221        .map(|doc| {
222            serde_json::from_value(doc).map_err(|e| {
223                ProviderError::permanent(
224                    "collect_orch_messages",
225                    format!("Failed to deserialize queue item: {e}"),
226                )
227            })
228        })
229        .collect()
230}
231
232/// Fetch history for an instance/execution.
233pub async fn fetch_history(
234    client: &CosmosDBClient,
235    instance_id: &str,
236    execution_id: u64,
237) -> Result<Vec<HistoryDocument>, ProviderError> {
238    let sql = format!(
239        "SELECT * FROM c \
240         WHERE c.instanceId = @instanceId \
241         AND c.type = '{}' \
242         AND c.executionId = @executionId \
243         ORDER BY c.eventId",
244        DOC_TYPE_HISTORY
245    );
246
247    let params = vec![
248        QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
249        QueryParameter::new("@executionId", serde_json::json!(execution_id)),
250    ];
251
252    let results = client.query(&sql, params, Some(instance_id)).await?;
253
254    results
255        .into_iter()
256        .map(|doc| {
257            serde_json::from_value(doc).map_err(|e| {
258                ProviderError::permanent(
259                    "fetch_history",
260                    format!("Failed to deserialize history doc: {e}"),
261                )
262            })
263        })
264        .collect()
265}
266
267/// Query for all documents by type in a partition.
268pub async fn query_by_type_in_partition(
269    client: &CosmosDBClient,
270    instance_id: &str,
271    doc_type: &str,
272) -> Result<Vec<serde_json::Value>, ProviderError> {
273    let sql = "SELECT * FROM c WHERE c.instanceId = @instanceId AND c.type = @type";
274    let params = vec![
275        QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
276        QueryParameter::new("@type", serde_json::json!(doc_type)),
277    ];
278    client.query(sql, params, Some(instance_id)).await
279}
280
281/// Query instances across all partitions with optional status filter.
282pub async fn query_instances(
283    client: &CosmosDBClient,
284    status_filter: Option<&str>,
285) -> Result<Vec<InstanceDocument>, ProviderError> {
286    let mut sql = format!("SELECT * FROM c WHERE c.type = '{}'", DOC_TYPE_INSTANCE);
287    let mut params = vec![];
288
289    if let Some(status) = status_filter {
290        sql.push_str(" AND c.status = @status");
291        params.push(QueryParameter::new("@status", serde_json::json!(status)));
292    }
293
294    let results = client.query(&sql, params, None).await?;
295
296    results
297        .into_iter()
298        .map(|doc| {
299            serde_json::from_value(doc).map_err(|e| {
300                ProviderError::permanent(
301                    "query_instances",
302                    format!("Failed to deserialize instance: {e}"),
303                )
304            })
305        })
306        .collect()
307}
308
309/// Find orch_queue items locked by a specific lock token.
310pub async fn find_items_by_lock_token(
311    client: &CosmosDBClient,
312    lock_token: &str,
313    doc_type: &str,
314) -> Result<Vec<QueueItemDocument>, ProviderError> {
315    let sql = "SELECT * FROM c WHERE c.type = @type AND c.lockToken = @lockToken".to_string();
316    let params = vec![
317        QueryParameter::new("@type", serde_json::json!(doc_type)),
318        QueryParameter::new("@lockToken", serde_json::json!(lock_token)),
319    ];
320
321    let results = client.query(&sql, params, None).await?;
322
323    results
324        .into_iter()
325        .map(|doc| {
326            serde_json::from_value(doc).map_err(|e| {
327                ProviderError::permanent(
328                    "find_items_by_lock_token",
329                    format!("Failed to deserialize queue item: {e}"),
330                )
331            })
332        })
333        .collect()
334}
335
336/// Find instance by lock token.
337pub async fn find_instance_by_lock_token(
338    client: &CosmosDBClient,
339    lock_token: &str,
340) -> Result<Option<InstanceDocument>, ProviderError> {
341    let sql = format!(
342        "SELECT * FROM c WHERE c.type = '{}' AND c.lockToken = @lockToken",
343        DOC_TYPE_INSTANCE
344    );
345    let params = vec![QueryParameter::new(
346        "@lockToken",
347        serde_json::json!(lock_token),
348    )];
349
350    let results = client.query(&sql, params, None).await?;
351
352    if let Some(doc) = results.into_iter().next() {
353        let inst: InstanceDocument = serde_json::from_value(doc).map_err(|e| {
354            ProviderError::permanent(
355                "find_instance_by_lock_token",
356                format!("Failed to deserialize instance: {e}"),
357            )
358        })?;
359        Ok(Some(inst))
360    } else {
361        Ok(None)
362    }
363}
364
365/// Query pending outbox intents older than a threshold.
366pub async fn query_pending_intents(
367    client: &CosmosDBClient,
368    age_threshold_ms: u64,
369    now_ms: u64,
370) -> Result<Vec<crate::models::OutboxIntentDocument>, ProviderError> {
371    let cutoff = now_ms.saturating_sub(age_threshold_ms);
372    let sql = format!(
373        "SELECT * FROM c WHERE c.type = '{}' AND c.status = 'pending' AND c.createdAt <= @cutoff",
374        DOC_TYPE_OUTBOX_INTENT
375    );
376    let params = vec![QueryParameter::new("@cutoff", serde_json::json!(cutoff))];
377
378    let results = client.query(&sql, params, None).await?;
379
380    results
381        .into_iter()
382        .map(|doc| {
383            serde_json::from_value(doc).map_err(|e| {
384                ProviderError::permanent(
385                    "query_pending_intents",
386                    format!("Failed to deserialize outbox intent: {e}"),
387                )
388            })
389        })
390        .collect()
391}
392
393/// Query all documents in a partition (for deletion).
394pub async fn query_all_in_partition(
395    client: &CosmosDBClient,
396    instance_id: &str,
397) -> Result<Vec<serde_json::Value>, ProviderError> {
398    let sql = "SELECT c.id FROM c WHERE c.instanceId = @instanceId";
399    let params = vec![QueryParameter::new(
400        "@instanceId",
401        serde_json::json!(instance_id),
402    )];
403    client.query(sql, params, Some(instance_id)).await
404}
405
406/// Count documents by type (cross-partition).
407/// Uses client-side counting since cross-partition aggregates aren't supported via gateway.
408pub async fn count_by_type(
409    client: &CosmosDBClient,
410    doc_type: &str,
411    extra_filter: Option<&str>,
412) -> Result<usize, ProviderError> {
413    let mut sql = "SELECT c.id FROM c WHERE c.type = @type".to_string();
414    if let Some(filter) = extra_filter {
415        sql.push_str(&format!(" AND {filter}"));
416    }
417    let params = vec![QueryParameter::new("@type", serde_json::json!(doc_type))];
418    let results = client.query(&sql, params, None).await?;
419    Ok(results.len())
420}