use ironflow::store::{BeginResult, UnitOfWork};
use ironflow::{PgStore, Result, Store, WorkflowId};
use serde::Serialize;
use sqlx::PgPool;
pub async fn seed_events<E: Serialize + Send>(
store: &PgStore,
workflow_type: &'static str,
workflow_id: &WorkflowId,
events: Vec<E>,
) -> Result<()> {
let BeginResult::Active { mut uow, .. } = store.begin(workflow_type, workflow_id, None).await?
else {
return Ok(());
};
uow.append_events(events).await?;
uow.commit().await?;
Ok(())
}
pub async fn fetch_events(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
) -> Result<Vec<serde_json::Value>> {
let events: Vec<serde_json::Value> = sqlx::query_scalar!(
r#"SELECT payload FROM ironflow.events
WHERE workflow_type = $1 AND workflow_id = $2
ORDER BY sequence"#,
workflow_type,
workflow_id,
)
.fetch_all(pool)
.await?;
Ok(events)
}
pub async fn count_events_for_workflow(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
) -> Result<i64> {
let count = sqlx::query_scalar!(
"SELECT COUNT(*) FROM ironflow.events WHERE workflow_type = $1 AND workflow_id = $2",
workflow_type,
workflow_id
)
.fetch_one(pool)
.await?
.unwrap_or(0);
Ok(count)
}
pub async fn fetch_events_matching(
pool: &PgPool,
workflow_type: &str,
workflow_id_pattern: &str,
) -> Result<Vec<(String, serde_json::Value)>> {
let rows = sqlx::query!(
r#"SELECT workflow_id, payload FROM ironflow.events
WHERE workflow_type = $1 AND workflow_id LIKE $2
ORDER BY workflow_id, sequence"#,
workflow_type,
workflow_id_pattern,
)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|r| (r.workflow_id, r.payload))
.collect())
}
pub async fn count_completed_workflows(
pool: &PgPool,
workflow_type: &str,
workflow_id_pattern: &str,
event_type: &str,
) -> Result<i64> {
let count = sqlx::query_scalar!(
r#"
SELECT COUNT(DISTINCT workflow_id)
FROM ironflow.events
WHERE workflow_type = $1
AND workflow_id LIKE $2
AND payload->>'type' = $3
"#,
workflow_type,
workflow_id_pattern,
event_type
)
.fetch_one(pool)
.await?
.unwrap_or(0);
Ok(count)
}
pub async fn fetch_effects(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
) -> Result<Vec<serde_json::Value>> {
let effects: Vec<serde_json::Value> = sqlx::query_scalar!(
r#"SELECT payload FROM ironflow.outbox
WHERE workflow_type = $1 AND workflow_id = $2
ORDER BY created_at"#,
workflow_type,
workflow_id,
)
.fetch_all(pool)
.await?;
Ok(effects)
}
pub async fn count_effects(
pool: &PgPool,
workflow_type: &str,
processed: Option<bool>,
workflow_ids: Option<&[&str]>,
) -> Result<i64> {
let mut query = String::from("SELECT COUNT(*) FROM ironflow.outbox WHERE workflow_type = $1");
if let Some(is_processed) = processed {
if is_processed {
query.push_str(" AND processed_at IS NOT NULL");
} else {
query.push_str(" AND processed_at IS NULL");
}
}
if workflow_ids.is_some() {
query.push_str(" AND workflow_id = ANY($2)");
}
let count: i64 = if let Some(ids) = workflow_ids {
let ids: Vec<String> = ids.iter().map(|s| s.to_string()).collect();
sqlx::query_scalar(&query)
.bind(workflow_type)
.bind(&ids)
.fetch_one(pool)
.await?
} else {
sqlx::query_scalar(&query)
.bind(workflow_type)
.fetch_one(pool)
.await?
};
Ok(count)
}
pub async fn is_effect_processed(pool: &PgPool, workflow_id: &str) -> Result<bool> {
let processed_at: Option<time::OffsetDateTime> = sqlx::query_scalar!(
"SELECT processed_at FROM ironflow.outbox WHERE workflow_id = $1",
workflow_id
)
.fetch_one(pool)
.await?;
Ok(processed_at.is_some())
}
pub async fn fetch_effect_id(pool: &PgPool, workflow_id: &str) -> Result<uuid::Uuid> {
let id = sqlx::query_scalar!(
"SELECT id FROM ironflow.outbox WHERE workflow_id = $1",
workflow_id
)
.fetch_one(pool)
.await?;
Ok(id)
}
pub async fn fetch_input_observations(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
) -> Result<Vec<(String, serde_json::Value)>> {
let rows = sqlx::query!(
r#"SELECT input_type, payload FROM ironflow.input_observations
WHERE workflow_type = $1 AND workflow_id = $2
ORDER BY observed_at"#,
workflow_type,
workflow_id,
)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|row| (row.input_type, row.payload))
.collect())
}
pub async fn insert_outbox_effect(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
payload: serde_json::Value,
attempts: i32,
last_error: Option<&str>,
) -> Result<uuid::Uuid> {
let id = sqlx::query_scalar!(
r#"
INSERT INTO ironflow.outbox (workflow_type, workflow_id, payload, attempts, last_error)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
"#,
workflow_type,
workflow_id,
payload,
attempts,
last_error,
)
.fetch_one(pool)
.await?;
Ok(id)
}
pub async fn fetch_outbox_payload(pool: &PgPool, workflow_id: &str) -> Result<serde_json::Value> {
let payload = sqlx::query_scalar!(
"SELECT payload FROM ironflow.outbox WHERE workflow_id = $1",
workflow_id
)
.fetch_one(pool)
.await?;
Ok(payload)
}
pub async fn count_timers(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
pending_only: bool,
key: Option<&str>,
) -> Result<i64> {
let mut query = String::from(
"SELECT COUNT(*) FROM ironflow.timers WHERE workflow_type = $1 AND workflow_id = $2",
);
if pending_only {
query.push_str(" AND processed_at IS NULL");
}
if key.is_some() {
query.push_str(" AND key = $3");
}
let count: i64 = if let Some(k) = key {
sqlx::query_scalar(&query)
.bind(workflow_type)
.bind(workflow_id)
.bind(k)
.fetch_one(pool)
.await?
} else {
sqlx::query_scalar(&query)
.bind(workflow_type)
.bind(workflow_id)
.fetch_one(pool)
.await?
};
Ok(count)
}
pub async fn fetch_timer_input(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
) -> Result<serde_json::Value> {
let input = sqlx::query_scalar!(
"SELECT input FROM ironflow.timers WHERE workflow_type = $1 AND workflow_id = $2",
workflow_type,
workflow_id
)
.fetch_one(pool)
.await?;
Ok(input)
}
pub async fn insert_due_timer(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
input: serde_json::Value,
) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO ironflow.timers (workflow_type, workflow_id, fire_at, input)
VALUES ($1, $2, now() - interval '1 minute', $3)
"#,
workflow_type,
workflow_id,
input,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn insert_future_timer(
pool: &PgPool,
workflow_type: &str,
workflow_id: &str,
input: serde_json::Value,
) -> Result<()> {
sqlx::query!(
r#"
INSERT INTO ironflow.timers (workflow_type, workflow_id, fire_at, input)
VALUES ($1, $2, now() + interval '1 hour', $3)
"#,
workflow_type,
workflow_id,
input,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn count_events(pool: &PgPool) -> Result<i64> {
let count = sqlx::query_scalar!("SELECT COUNT(*) FROM ironflow.events")
.fetch_one(pool)
.await?
.unwrap_or(0);
Ok(count)
}