1use crate::client::{CosmosDBClient, QueryParameter};
2use crate::models::*;
3use duroxide::providers::ProviderError;
4use duroxide::TagFilter;
5
6pub async fn find_candidate_orch_item(
8 client: &CosmosDBClient,
9 now_ms: u64,
10 my_slots: &[u8],
11 min_version_packed: Option<i64>,
12 max_version_packed: Option<i64>,
13 excluded_instances: &[String],
14) -> Result<Option<QueueItemDocument>, ProviderError> {
15 if my_slots.is_empty() {
17 return Ok(None);
18 }
19
20 let mut sql = format!(
21 "SELECT * FROM c \
22 WHERE c.type = '{}' \
23 AND c.visibleAt <= @now \
24 AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
25 DOC_TYPE_ORCH_QUEUE
26 );
27
28 if my_slots.len() < 256 {
30 let slot_list = my_slots
31 .iter()
32 .map(|s| s.to_string())
33 .collect::<Vec<_>>()
34 .join(",");
35 sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
36 }
37
38 let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
39
40 if let (Some(min_v), Some(max_v)) = (min_version_packed, max_version_packed) {
42 sql.push_str(
43 " AND (NOT IS_DEFINED(c.pinnedDuroxideVersionPacked) \
44 OR c.pinnedDuroxideVersionPacked = null \
45 OR (c.pinnedDuroxideVersionPacked >= @minVersion \
46 AND c.pinnedDuroxideVersionPacked <= @maxVersion))",
47 );
48 params.push(QueryParameter::new("@minVersion", serde_json::json!(min_v)));
49 params.push(QueryParameter::new("@maxVersion", serde_json::json!(max_v)));
50 }
51
52 for (i, instance) in excluded_instances.iter().enumerate() {
54 let param_name = format!("@excl{i}");
55 sql.push_str(&format!(" AND c.instanceId != {param_name}"));
56 params.push(QueryParameter::new(param_name, serde_json::json!(instance)));
57 }
58
59 let results = client.query(&sql, params, None).await?;
62
63 let mut items: Vec<QueueItemDocument> = results
64 .into_iter()
65 .map(serde_json::from_value)
66 .collect::<Result<Vec<_>, _>>()
67 .map_err(|e| {
68 ProviderError::permanent(
69 "find_candidate_orch_item",
70 format!("Failed to deserialize queue item: {e}"),
71 )
72 })?;
73
74 items.sort_by_key(|i| i.enqueued_at);
76 Ok(items.into_iter().next())
77}
78
79pub async fn find_candidate_work_item(
81 client: &CosmosDBClient,
82 now_ms: u64,
83 my_slots: &[u8],
84 session_owner_id: Option<&str>,
85 excluded_items: &[String],
86 tag_filter: &TagFilter,
87) -> Result<Option<QueueItemDocument>, ProviderError> {
88 if my_slots.is_empty() {
90 return Ok(None);
91 }
92
93 let mut sql = format!(
94 "SELECT * FROM c \
95 WHERE c.type = '{}' \
96 AND c.visibleAt <= @now \
97 AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
98 DOC_TYPE_WORKER_QUEUE
99 );
100
101 if my_slots.len() < 256 {
103 let slot_list = my_slots
104 .iter()
105 .map(|s| s.to_string())
106 .collect::<Vec<_>>()
107 .join(",");
108 sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
109 }
110
111 let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
112
113 if session_owner_id.is_none() {
115 sql.push_str(" AND (NOT IS_DEFINED(c.sessionId) OR c.sessionId = null)");
116 }
117
118 match tag_filter {
120 TagFilter::DefaultOnly => {
121 sql.push_str(" AND (NOT IS_DEFINED(c.tag) OR c.tag = null)");
122 }
123 TagFilter::Tags(set) => {
124 let mut tags: Vec<String> = set.iter().cloned().collect();
125 tags.sort();
126 let tag_list: Vec<String> = tags
127 .iter()
128 .enumerate()
129 .map(|(i, tag)| {
130 let param_name = format!("@tag{i}");
131 params.push(QueryParameter::new(
132 param_name.clone(),
133 serde_json::json!(tag),
134 ));
135 param_name
136 })
137 .collect();
138 sql.push_str(&format!(" AND c.tag IN ({})", tag_list.join(",")));
139 }
140 TagFilter::DefaultAnd(set) => {
141 let mut tags: Vec<String> = set.iter().cloned().collect();
142 tags.sort();
143 let tag_list: Vec<String> = tags
144 .iter()
145 .enumerate()
146 .map(|(i, tag)| {
147 let param_name = format!("@tag{i}");
148 params.push(QueryParameter::new(
149 param_name.clone(),
150 serde_json::json!(tag),
151 ));
152 param_name
153 })
154 .collect();
155 sql.push_str(&format!(
156 " AND (NOT IS_DEFINED(c.tag) OR c.tag = null OR c.tag IN ({}))",
157 tag_list.join(",")
158 ));
159 }
160 TagFilter::Any => {
161 }
163 TagFilter::None => {
164 sql.push_str(" AND false");
166 }
167 }
168
169 for (i, item_id) in excluded_items.iter().enumerate() {
171 let param_name = format!("@excl{i}");
172 sql.push_str(&format!(" AND c.id != {param_name}"));
173 params.push(QueryParameter::new(param_name, serde_json::json!(item_id)));
174 }
175
176 let results = client.query(&sql, params, None).await?;
179
180 let mut items: Vec<QueueItemDocument> = results
181 .into_iter()
182 .map(serde_json::from_value)
183 .collect::<Result<Vec<_>, _>>()
184 .map_err(|e| {
185 ProviderError::permanent(
186 "find_candidate_work_item",
187 format!("Failed to deserialize work item: {e}"),
188 )
189 })?;
190
191 items.sort_by_key(|i| i.enqueued_at);
193 Ok(items.into_iter().next())
194}
195
196pub async fn collect_orch_messages(
198 client: &CosmosDBClient,
199 instance_id: &str,
200 now_ms: u64,
201) -> Result<Vec<QueueItemDocument>, ProviderError> {
202 let sql = format!(
203 "SELECT * FROM c \
204 WHERE c.instanceId = @instanceId \
205 AND c.type = '{}' \
206 AND c.visibleAt <= @now \
207 AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now) \
208 ORDER BY c.enqueuedAt",
209 DOC_TYPE_ORCH_QUEUE
210 );
211
212 let params = vec![
213 QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
214 QueryParameter::new("@now", serde_json::json!(now_ms)),
215 ];
216
217 let results = client.query(&sql, params, Some(instance_id)).await?;
218
219 results
220 .into_iter()
221 .map(|doc| {
222 serde_json::from_value(doc).map_err(|e| {
223 ProviderError::permanent(
224 "collect_orch_messages",
225 format!("Failed to deserialize queue item: {e}"),
226 )
227 })
228 })
229 .collect()
230}
231
232pub async fn fetch_history(
234 client: &CosmosDBClient,
235 instance_id: &str,
236 execution_id: u64,
237) -> Result<Vec<HistoryDocument>, ProviderError> {
238 let sql = format!(
239 "SELECT * FROM c \
240 WHERE c.instanceId = @instanceId \
241 AND c.type = '{}' \
242 AND c.executionId = @executionId \
243 ORDER BY c.eventId",
244 DOC_TYPE_HISTORY
245 );
246
247 let params = vec![
248 QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
249 QueryParameter::new("@executionId", serde_json::json!(execution_id)),
250 ];
251
252 let results = client.query(&sql, params, Some(instance_id)).await?;
253
254 results
255 .into_iter()
256 .map(|doc| {
257 serde_json::from_value(doc).map_err(|e| {
258 ProviderError::permanent(
259 "fetch_history",
260 format!("Failed to deserialize history doc: {e}"),
261 )
262 })
263 })
264 .collect()
265}
266
267pub async fn query_by_type_in_partition(
269 client: &CosmosDBClient,
270 instance_id: &str,
271 doc_type: &str,
272) -> Result<Vec<serde_json::Value>, ProviderError> {
273 let sql = "SELECT * FROM c WHERE c.instanceId = @instanceId AND c.type = @type";
274 let params = vec![
275 QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
276 QueryParameter::new("@type", serde_json::json!(doc_type)),
277 ];
278 client.query(sql, params, Some(instance_id)).await
279}
280
281pub async fn query_instances(
283 client: &CosmosDBClient,
284 status_filter: Option<&str>,
285) -> Result<Vec<InstanceDocument>, ProviderError> {
286 let mut sql = format!("SELECT * FROM c WHERE c.type = '{}'", DOC_TYPE_INSTANCE);
287 let mut params = vec![];
288
289 if let Some(status) = status_filter {
290 sql.push_str(" AND c.status = @status");
291 params.push(QueryParameter::new("@status", serde_json::json!(status)));
292 }
293
294 let results = client.query(&sql, params, None).await?;
295
296 results
297 .into_iter()
298 .map(|doc| {
299 serde_json::from_value(doc).map_err(|e| {
300 ProviderError::permanent(
301 "query_instances",
302 format!("Failed to deserialize instance: {e}"),
303 )
304 })
305 })
306 .collect()
307}
308
309pub async fn find_items_by_lock_token(
311 client: &CosmosDBClient,
312 lock_token: &str,
313 doc_type: &str,
314) -> Result<Vec<QueueItemDocument>, ProviderError> {
315 let sql = "SELECT * FROM c WHERE c.type = @type AND c.lockToken = @lockToken".to_string();
316 let params = vec![
317 QueryParameter::new("@type", serde_json::json!(doc_type)),
318 QueryParameter::new("@lockToken", serde_json::json!(lock_token)),
319 ];
320
321 let results = client.query(&sql, params, None).await?;
322
323 results
324 .into_iter()
325 .map(|doc| {
326 serde_json::from_value(doc).map_err(|e| {
327 ProviderError::permanent(
328 "find_items_by_lock_token",
329 format!("Failed to deserialize queue item: {e}"),
330 )
331 })
332 })
333 .collect()
334}
335
336pub async fn find_instance_by_lock_token(
338 client: &CosmosDBClient,
339 lock_token: &str,
340) -> Result<Option<InstanceDocument>, ProviderError> {
341 let sql = format!(
342 "SELECT * FROM c WHERE c.type = '{}' AND c.lockToken = @lockToken",
343 DOC_TYPE_INSTANCE
344 );
345 let params = vec![QueryParameter::new(
346 "@lockToken",
347 serde_json::json!(lock_token),
348 )];
349
350 let results = client.query(&sql, params, None).await?;
351
352 if let Some(doc) = results.into_iter().next() {
353 let inst: InstanceDocument = serde_json::from_value(doc).map_err(|e| {
354 ProviderError::permanent(
355 "find_instance_by_lock_token",
356 format!("Failed to deserialize instance: {e}"),
357 )
358 })?;
359 Ok(Some(inst))
360 } else {
361 Ok(None)
362 }
363}
364
365pub async fn query_pending_intents(
367 client: &CosmosDBClient,
368 age_threshold_ms: u64,
369 now_ms: u64,
370) -> Result<Vec<crate::models::OutboxIntentDocument>, ProviderError> {
371 let cutoff = now_ms.saturating_sub(age_threshold_ms);
372 let sql = format!(
373 "SELECT * FROM c WHERE c.type = '{}' AND c.status = 'pending' AND c.createdAt <= @cutoff",
374 DOC_TYPE_OUTBOX_INTENT
375 );
376 let params = vec![QueryParameter::new("@cutoff", serde_json::json!(cutoff))];
377
378 let results = client.query(&sql, params, None).await?;
379
380 results
381 .into_iter()
382 .map(|doc| {
383 serde_json::from_value(doc).map_err(|e| {
384 ProviderError::permanent(
385 "query_pending_intents",
386 format!("Failed to deserialize outbox intent: {e}"),
387 )
388 })
389 })
390 .collect()
391}
392
393pub async fn query_all_in_partition(
395 client: &CosmosDBClient,
396 instance_id: &str,
397) -> Result<Vec<serde_json::Value>, ProviderError> {
398 let sql = "SELECT c.id FROM c WHERE c.instanceId = @instanceId";
399 let params = vec![QueryParameter::new(
400 "@instanceId",
401 serde_json::json!(instance_id),
402 )];
403 client.query(sql, params, Some(instance_id)).await
404}
405
406pub async fn count_by_type(
409 client: &CosmosDBClient,
410 doc_type: &str,
411 extra_filter: Option<&str>,
412) -> Result<usize, ProviderError> {
413 let mut sql = "SELECT c.id FROM c WHERE c.type = @type".to_string();
414 if let Some(filter) = extra_filter {
415 sql.push_str(&format!(" AND {filter}"));
416 }
417 let params = vec![QueryParameter::new("@type", serde_json::json!(doc_type))];
418 let results = client.query(&sql, params, None).await?;
419 Ok(results.len())
420}