use crate::runtime::RuntimeHandle;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Clone)]
pub struct ProducerContext {
runtime: Option<Arc<dyn RuntimeHandle>>,
default_headers: HashMap<String, String>,
timeout_ms: Option<u64>,
}
impl ProducerContext {
pub fn new() -> Self {
Self {
runtime: None,
default_headers: HashMap::new(),
timeout_ms: None,
}
}
pub fn with_runtime(mut self, runtime: Arc<dyn RuntimeHandle>) -> Self {
self.runtime = Some(runtime);
self
}
pub fn runtime(&self) -> Option<&Arc<dyn RuntimeHandle>> {
self.runtime.as_ref()
}
pub fn with_default_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.default_headers.insert(key.into(), value.into());
self
}
pub fn default_headers(&self) -> &HashMap<String, String> {
&self.default_headers
}
pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
self.timeout_ms = Some(timeout_ms);
self
}
pub fn timeout_ms(&self) -> Option<u64> {
self.timeout_ms
}
}
impl Default for ProducerContext {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CamelError;
use crate::runtime::{
RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery, RuntimeQueryBus,
RuntimeQueryResult,
};
use async_trait::async_trait;
use futures::executor::block_on;
struct NoopRuntime;
#[async_trait]
impl RuntimeCommandBus for NoopRuntime {
async fn execute(&self, _cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
Ok(RuntimeCommandResult::Accepted)
}
}
#[async_trait]
impl RuntimeQueryBus for NoopRuntime {
async fn ask(&self, _query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
Ok(RuntimeQueryResult::Routes { route_ids: vec![] })
}
}
#[test]
fn producer_context_new_is_empty() {
let ctx = ProducerContext::new();
assert!(ctx.runtime().is_none());
}
#[test]
fn producer_context_default_is_empty() {
let ctx = ProducerContext::default();
assert!(ctx.runtime().is_none());
}
#[test]
fn producer_context_with_runtime_sets_handle() {
let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let ctx = ProducerContext::new().with_runtime(runtime.clone());
let attached = ctx.runtime().expect("runtime should be set");
assert!(Arc::ptr_eq(attached, &runtime));
}
#[test]
fn producer_context_clone_keeps_same_runtime_handle() {
let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let ctx = ProducerContext::new().with_runtime(runtime.clone());
let cloned = ctx.clone();
assert!(Arc::ptr_eq(cloned.runtime().unwrap(), &runtime));
}
#[test]
fn producer_context_with_runtime_can_execute_command() {
let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let ctx = ProducerContext::new().with_runtime(runtime);
let result = block_on(ctx.runtime().unwrap().execute(RuntimeCommand::StartRoute {
route_id: "r1".into(),
command_id: "c1".into(),
causation_id: None,
}))
.unwrap();
assert_eq!(result, RuntimeCommandResult::Accepted);
}
#[test]
fn producer_context_with_runtime_can_execute_query() {
let runtime: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let ctx = ProducerContext::new().with_runtime(runtime);
let result = block_on(ctx.runtime().unwrap().ask(RuntimeQuery::ListRoutes)).unwrap();
assert_eq!(result, RuntimeQueryResult::Routes { route_ids: vec![] });
}
#[test]
fn producer_context_with_runtime_replaces_previous_runtime() {
let first: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let second: Arc<dyn RuntimeHandle> = Arc::new(NoopRuntime);
let ctx = ProducerContext::new()
.with_runtime(first)
.with_runtime(second.clone());
assert!(Arc::ptr_eq(ctx.runtime().unwrap(), &second));
}
#[test]
fn producer_context_default_headers_empty() {
let ctx = ProducerContext::new();
assert!(ctx.default_headers().is_empty());
}
#[test]
fn producer_context_with_default_header() {
let ctx = ProducerContext::new()
.with_default_header("X-Source", "camel")
.with_default_header("X-Trace", "enabled");
let headers = ctx.default_headers();
assert_eq!(headers.get("X-Source").unwrap(), "camel");
assert_eq!(headers.get("X-Trace").unwrap(), "enabled");
}
#[test]
fn producer_context_timeout_ms() {
let ctx = ProducerContext::new();
assert!(ctx.timeout_ms().is_none());
let ctx = ctx.with_timeout_ms(5000);
assert_eq!(ctx.timeout_ms(), Some(5000));
}
}