use crate::client::{CosmosDBClient, QueryParameter};
use crate::models::*;
use duroxide::providers::ProviderError;
use duroxide::TagFilter;
pub async fn find_candidate_orch_item(
client: &CosmosDBClient,
now_ms: u64,
my_slots: &[u8],
min_version_packed: Option<i64>,
max_version_packed: Option<i64>,
excluded_instances: &[String],
) -> Result<Option<QueueItemDocument>, ProviderError> {
if my_slots.is_empty() {
return Ok(None);
}
let mut sql = format!(
"SELECT * FROM c \
WHERE c.type = '{}' \
AND c.visibleAt <= @now \
AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
DOC_TYPE_ORCH_QUEUE
);
if my_slots.len() < 256 {
let slot_list = my_slots
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(",");
sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
}
let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
if let (Some(min_v), Some(max_v)) = (min_version_packed, max_version_packed) {
sql.push_str(
" AND (NOT IS_DEFINED(c.pinnedDuroxideVersionPacked) \
OR c.pinnedDuroxideVersionPacked = null \
OR (c.pinnedDuroxideVersionPacked >= @minVersion \
AND c.pinnedDuroxideVersionPacked <= @maxVersion))",
);
params.push(QueryParameter::new("@minVersion", serde_json::json!(min_v)));
params.push(QueryParameter::new("@maxVersion", serde_json::json!(max_v)));
}
for (i, instance) in excluded_instances.iter().enumerate() {
let param_name = format!("@excl{i}");
sql.push_str(&format!(" AND c.instanceId != {param_name}"));
params.push(QueryParameter::new(param_name, serde_json::json!(instance)));
}
let results = client.query(&sql, params, None).await?;
let mut items: Vec<QueueItemDocument> = results
.into_iter()
.map(|doc| serde_json::from_value(doc))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
ProviderError::permanent(
"find_candidate_orch_item",
format!("Failed to deserialize queue item: {e}"),
)
})?;
items.sort_by_key(|i| i.enqueued_at);
Ok(items.into_iter().next())
}
pub async fn find_candidate_work_item(
client: &CosmosDBClient,
now_ms: u64,
my_slots: &[u8],
session_owner_id: Option<&str>,
excluded_items: &[String],
tag_filter: &TagFilter,
) -> Result<Option<QueueItemDocument>, ProviderError> {
if my_slots.is_empty() {
return Ok(None);
}
let mut sql = format!(
"SELECT * FROM c \
WHERE c.type = '{}' \
AND c.visibleAt <= @now \
AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now)",
DOC_TYPE_WORKER_QUEUE
);
if my_slots.len() < 256 {
let slot_list = my_slots
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(",");
sql.push_str(&format!(" AND c.dispatchSlot IN ({})", slot_list));
}
let mut params = vec![QueryParameter::new("@now", serde_json::json!(now_ms))];
if session_owner_id.is_none() {
sql.push_str(" AND (NOT IS_DEFINED(c.sessionId) OR c.sessionId = null)");
}
match tag_filter {
TagFilter::DefaultOnly => {
sql.push_str(" AND (NOT IS_DEFINED(c.tag) OR c.tag = null)");
}
TagFilter::Tags(set) => {
let mut tags: Vec<String> = set.iter().cloned().collect();
tags.sort();
let tag_list: Vec<String> = tags
.iter()
.enumerate()
.map(|(i, tag)| {
let param_name = format!("@tag{i}");
params.push(QueryParameter::new(param_name.clone(), serde_json::json!(tag)));
param_name
})
.collect();
sql.push_str(&format!(" AND c.tag IN ({})", tag_list.join(",")));
}
TagFilter::DefaultAnd(set) => {
let mut tags: Vec<String> = set.iter().cloned().collect();
tags.sort();
let tag_list: Vec<String> = tags
.iter()
.enumerate()
.map(|(i, tag)| {
let param_name = format!("@tag{i}");
params.push(QueryParameter::new(param_name.clone(), serde_json::json!(tag)));
param_name
})
.collect();
sql.push_str(&format!(
" AND (NOT IS_DEFINED(c.tag) OR c.tag = null OR c.tag IN ({}))",
tag_list.join(",")
));
}
TagFilter::Any => {
}
TagFilter::None => {
sql.push_str(" AND false");
}
}
for (i, item_id) in excluded_items.iter().enumerate() {
let param_name = format!("@excl{i}");
sql.push_str(&format!(" AND c.id != {param_name}"));
params.push(QueryParameter::new(param_name, serde_json::json!(item_id)));
}
let results = client.query(&sql, params, None).await?;
let mut items: Vec<QueueItemDocument> = results
.into_iter()
.map(|doc| serde_json::from_value(doc))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
ProviderError::permanent(
"find_candidate_work_item",
format!("Failed to deserialize work item: {e}"),
)
})?;
items.sort_by_key(|i| i.enqueued_at);
Ok(items.into_iter().next())
}
pub async fn collect_orch_messages(
client: &CosmosDBClient,
instance_id: &str,
now_ms: u64,
) -> Result<Vec<QueueItemDocument>, ProviderError> {
let sql = format!(
"SELECT * FROM c \
WHERE c.instanceId = @instanceId \
AND c.type = '{}' \
AND c.visibleAt <= @now \
AND (NOT IS_DEFINED(c.lockedUntil) OR c.lockedUntil = null OR c.lockedUntil <= @now) \
ORDER BY c.enqueuedAt",
DOC_TYPE_ORCH_QUEUE
);
let params = vec![
QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
QueryParameter::new("@now", serde_json::json!(now_ms)),
];
let results = client.query(&sql, params, Some(instance_id)).await?;
results
.into_iter()
.map(|doc| {
serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"collect_orch_messages",
format!("Failed to deserialize queue item: {e}"),
)
})
})
.collect()
}
pub async fn fetch_history(
client: &CosmosDBClient,
instance_id: &str,
execution_id: u64,
) -> Result<Vec<HistoryDocument>, ProviderError> {
let sql = format!(
"SELECT * FROM c \
WHERE c.instanceId = @instanceId \
AND c.type = '{}' \
AND c.executionId = @executionId \
ORDER BY c.eventId",
DOC_TYPE_HISTORY
);
let params = vec![
QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
QueryParameter::new("@executionId", serde_json::json!(execution_id)),
];
let results = client.query(&sql, params, Some(instance_id)).await?;
results
.into_iter()
.map(|doc| {
serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"fetch_history",
format!("Failed to deserialize history doc: {e}"),
)
})
})
.collect()
}
pub async fn query_by_type_in_partition(
client: &CosmosDBClient,
instance_id: &str,
doc_type: &str,
) -> Result<Vec<serde_json::Value>, ProviderError> {
let sql = "SELECT * FROM c WHERE c.instanceId = @instanceId AND c.type = @type";
let params = vec![
QueryParameter::new("@instanceId", serde_json::json!(instance_id)),
QueryParameter::new("@type", serde_json::json!(doc_type)),
];
client.query(sql, params, Some(instance_id)).await
}
pub async fn query_instances(
client: &CosmosDBClient,
status_filter: Option<&str>,
) -> Result<Vec<InstanceDocument>, ProviderError> {
let mut sql = format!("SELECT * FROM c WHERE c.type = '{}'", DOC_TYPE_INSTANCE);
let mut params = vec![];
if let Some(status) = status_filter {
sql.push_str(" AND c.status = @status");
params.push(QueryParameter::new("@status", serde_json::json!(status)));
}
let results = client.query(&sql, params, None).await?;
results
.into_iter()
.map(|doc| {
serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"query_instances",
format!("Failed to deserialize instance: {e}"),
)
})
})
.collect()
}
pub async fn find_items_by_lock_token(
client: &CosmosDBClient,
lock_token: &str,
doc_type: &str,
) -> Result<Vec<QueueItemDocument>, ProviderError> {
let sql = format!("SELECT * FROM c WHERE c.type = @type AND c.lockToken = @lockToken");
let params = vec![
QueryParameter::new("@type", serde_json::json!(doc_type)),
QueryParameter::new("@lockToken", serde_json::json!(lock_token)),
];
let results = client.query(&sql, params, None).await?;
results
.into_iter()
.map(|doc| {
serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"find_items_by_lock_token",
format!("Failed to deserialize queue item: {e}"),
)
})
})
.collect()
}
pub async fn find_instance_by_lock_token(
client: &CosmosDBClient,
lock_token: &str,
) -> Result<Option<InstanceDocument>, ProviderError> {
let sql = format!(
"SELECT * FROM c WHERE c.type = '{}' AND c.lockToken = @lockToken",
DOC_TYPE_INSTANCE
);
let params = vec![QueryParameter::new(
"@lockToken",
serde_json::json!(lock_token),
)];
let results = client.query(&sql, params, None).await?;
if let Some(doc) = results.into_iter().next() {
let inst: InstanceDocument = serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"find_instance_by_lock_token",
format!("Failed to deserialize instance: {e}"),
)
})?;
Ok(Some(inst))
} else {
Ok(None)
}
}
pub async fn query_pending_intents(
client: &CosmosDBClient,
age_threshold_ms: u64,
now_ms: u64,
) -> Result<Vec<crate::models::OutboxIntentDocument>, ProviderError> {
let cutoff = now_ms.saturating_sub(age_threshold_ms);
let sql = format!(
"SELECT * FROM c WHERE c.type = '{}' AND c.status = 'pending' AND c.createdAt <= @cutoff",
DOC_TYPE_OUTBOX_INTENT
);
let params = vec![QueryParameter::new("@cutoff", serde_json::json!(cutoff))];
let results = client.query(&sql, params, None).await?;
results
.into_iter()
.map(|doc| {
serde_json::from_value(doc).map_err(|e| {
ProviderError::permanent(
"query_pending_intents",
format!("Failed to deserialize outbox intent: {e}"),
)
})
})
.collect()
}
pub async fn query_all_in_partition(
client: &CosmosDBClient,
instance_id: &str,
) -> Result<Vec<serde_json::Value>, ProviderError> {
let sql = "SELECT c.id FROM c WHERE c.instanceId = @instanceId";
let params = vec![QueryParameter::new(
"@instanceId",
serde_json::json!(instance_id),
)];
client.query(sql, params, Some(instance_id)).await
}
pub async fn count_by_type(
client: &CosmosDBClient,
doc_type: &str,
extra_filter: Option<&str>,
) -> Result<usize, ProviderError> {
let mut sql = format!("SELECT c.id FROM c WHERE c.type = @type");
if let Some(filter) = extra_filter {
sql.push_str(&format!(" AND {filter}"));
}
let params = vec![QueryParameter::new("@type", serde_json::json!(doc_type))];
let results = client.query(&sql, params, None).await?;
Ok(results.len())
}