use anyhow::Result;
use chrono::Utc;
use sqlx::PgPool;
use stormchaser_model::events::{
EventSource, EventType, SchemaVersion, StepCompletedEvent, StepEventType,
};
use stormchaser_model::nats::{compute_shard_id, publish_cloudevent, NatsSubject};
use stormchaser_model::{Connection, RunId, StepInstanceId};
pub async fn fetch_http_api_url_from_connection(
pool: &PgPool,
connection_name: &str,
) -> Result<String> {
let mut conn = pool.acquire().await?;
if let Some(connection) = crate::db::connections::get_storage_backend_by_name::<
&mut sqlx::PgConnection,
Connection,
>(&mut *conn, connection_name)
.await?
{
if connection.connection_type == stormchaser_model::ConnectionType::HttpApi {
if let Some(url) = connection
.config
.get("base_url")
.and_then(|u| u.as_str())
.or_else(|| connection.config.get("url").and_then(|u| u.as_str()))
{
Ok(url.to_string())
} else {
anyhow::bail!(
"Connection {} is missing 'base_url' (or legacy 'url') in its config",
connection_name
)
}
} else {
anyhow::bail!("Connection {} must be of type HttpApi", connection_name)
}
} else {
anyhow::bail!("Connection {} not found", connection_name)
}
}
pub async fn publish_step_completed_event(
run_id: RunId,
step_id: StepInstanceId,
fencing_token: i64,
outputs: Option<std::collections::HashMap<String, serde_json::Value>>,
nats_client: async_nats::Client,
) -> Result<()> {
let event = StepCompletedEvent {
run_id,
step_id,
fencing_token,
event_type: EventType::Step(StepEventType::Completed),
runner_id: None,
storage_hashes: None,
artifacts: None,
test_reports: None,
outputs,
exit_code: None,
timestamp: Utc::now(),
};
let js = async_nats::jetstream::new(nats_client);
publish_cloudevent(
&js,
NatsSubject::StepCompleted(Some(compute_shard_id(&run_id))),
EventType::Step(StepEventType::Completed),
EventSource::System,
serde_json::to_value(event)?,
Some(SchemaVersion::new("1.0".to_string())),
None,
)
.await?;
Ok(())
}