use async_trait::async_trait;
use camel_api::RuntimeQueryResult;
use crate::CamelError;
use crate::lifecycle::application::route_definition::RouteDefinition;
use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RouteStatusProjection {
pub route_id: String,
pub status: String,
}
#[async_trait]
pub trait RouteRepositoryPort: Send + Sync {
async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError>;
async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError>;
async fn save_if_version(
&self,
aggregate: RouteRuntimeAggregate,
expected_version: u64,
) -> Result<(), CamelError>;
async fn delete(&self, route_id: &str) -> Result<(), CamelError>;
}
#[async_trait]
pub trait ProjectionStorePort: Send + Sync {
async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError>;
async fn get_status(&self, route_id: &str)
-> Result<Option<RouteStatusProjection>, CamelError>;
async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError>;
async fn remove_status(&self, route_id: &str) -> Result<(), CamelError>;
}
#[async_trait]
pub trait EventPublisherPort: Send + Sync {
async fn publish(&self, events: &[RuntimeEvent]) -> Result<(), CamelError>;
}
#[async_trait]
pub trait RuntimeEventJournalPort: Send + Sync {
async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), CamelError>;
async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError>;
async fn append_command_id(&self, _command_id: &str) -> Result<(), CamelError> {
Ok(())
}
async fn remove_command_id(&self, _command_id: &str) -> Result<(), CamelError> {
Ok(())
}
async fn load_command_ids(&self) -> Result<Vec<String>, CamelError> {
Ok(Vec::new())
}
}
#[async_trait]
pub trait CommandDedupPort: Send + Sync {
async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError>;
async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError>;
}
#[async_trait]
pub trait RuntimeUnitOfWorkPort: Send + Sync {
async fn persist_upsert(
&self,
aggregate: RouteRuntimeAggregate,
expected_version: Option<u64>,
projection: RouteStatusProjection,
events: &[RuntimeEvent],
) -> Result<(), CamelError>;
async fn persist_delete(
&self,
route_id: &str,
events: &[RuntimeEvent],
) -> Result<(), CamelError>;
async fn recover_from_journal(&self) -> Result<(), CamelError> {
Ok(())
}
}
#[async_trait]
pub trait RuntimeExecutionPort: Send + Sync {
async fn register_route(&self, definition: RouteDefinition) -> Result<(), CamelError>;
async fn start_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn stop_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn suspend_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn resume_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn reload_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn remove_route(&self, route_id: &str) -> Result<(), CamelError>;
async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, CamelError>;
}
#[cfg(test)]
mod tests {
use super::*;
struct DummyJournal;
#[async_trait]
impl RuntimeEventJournalPort for DummyJournal {
async fn append_batch(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
Ok(())
}
async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
Ok(Vec::new())
}
}
struct DummyUow;
#[async_trait]
impl RuntimeUnitOfWorkPort for DummyUow {
async fn persist_upsert(
&self,
_aggregate: RouteRuntimeAggregate,
_expected_version: Option<u64>,
_projection: RouteStatusProjection,
_events: &[RuntimeEvent],
) -> Result<(), CamelError> {
Ok(())
}
async fn persist_delete(
&self,
_route_id: &str,
_events: &[RuntimeEvent],
) -> Result<(), CamelError> {
Ok(())
}
}
#[tokio::test]
async fn default_journal_methods_are_noop_ok() {
let journal = DummyJournal;
journal.append_command_id("c1").await.unwrap();
journal.remove_command_id("c1").await.unwrap();
assert!(journal.load_command_ids().await.unwrap().is_empty());
}
#[tokio::test]
async fn default_uow_recover_is_noop_ok() {
let uow = DummyUow;
uow.recover_from_journal().await.unwrap();
}
}