camel-master 0.7.0

Master crate for Rust Camel, re-exporting all public APIs of the workspace
Documentation
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use async_trait::async_trait;
use camel_api::{
    CamelError, Exchange, LeaderElector, LeadershipEvent, LeadershipHandle, Message, NoOpMetrics,
    NoopLeaderElector, PlatformError, PlatformIdentity,
};
use camel_component_api::{
    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint, ProducerContext,
};
use camel_language_api::Language;
use camel_master::MasterComponent;
use tokio::sync::{oneshot, watch};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

struct TestComponentContext {
    delegate: Arc<dyn Component>,
    elector: Arc<dyn LeaderElector>,
}

impl ComponentContext for TestComponentContext {
    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
        if self.delegate.scheme() == scheme {
            Some(Arc::clone(&self.delegate))
        } else {
            None
        }
    }

    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
        None
    }

    fn metrics(&self) -> Arc<dyn camel_api::MetricsCollector> {
        Arc::new(NoOpMetrics)
    }

    fn leader_elector(&self) -> Arc<dyn LeaderElector> {
        Arc::clone(&self.elector)
    }
}

struct TestDelegateComponent;

impl Component for TestDelegateComponent {
    fn scheme(&self) -> &str {
        "test"
    }

    fn create_endpoint(
        &self,
        uri: &str,
        _ctx: &dyn ComponentContext,
    ) -> Result<Box<dyn Endpoint>, CamelError> {
        Ok(Box::new(TestDelegateEndpoint {
            uri: uri.to_string(),
        }))
    }
}

struct TestDelegateEndpoint {
    uri: String,
}

impl Endpoint for TestDelegateEndpoint {
    fn uri(&self) -> &str {
        &self.uri
    }

    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
        Ok(Box::new(TestDelegateConsumer))
    }

    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
        Err(CamelError::EndpointCreationFailed("not used".to_string()))
    }
}

struct TestDelegateConsumer;

#[async_trait]
impl Consumer for TestDelegateConsumer {
    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
        context
            .send(Exchange::new(Message::new("delegate-ok")))
            .await?;
        context.cancelled().await;
        Ok(())
    }

    async fn stop(&mut self) -> Result<(), CamelError> {
        Ok(())
    }
}

struct FakeLeaderElector {
    seen_node_id: Arc<Mutex<Option<String>>>,
}

impl FakeLeaderElector {
    fn new() -> Self {
        Self {
            seen_node_id: Arc::new(Mutex::new(None)),
        }
    }

    fn seen_node_id(&self) -> Option<String> {
        self.seen_node_id
            .lock()
            .expect("fake elector mutex poisoned")
            .clone()
    }
}

#[async_trait]
impl LeaderElector for FakeLeaderElector {
    async fn start(&self, identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError> {
        *self
            .seen_node_id
            .lock()
            .expect("fake elector mutex poisoned") = Some(identity.node_id);

        let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
        drop(tx);
        let (_term_tx, term_rx) = oneshot::channel();

        Ok(LeadershipHandle::new(
            rx,
            Arc::new(AtomicBool::new(true)),
            CancellationToken::new(),
            term_rx,
        ))
    }
}

#[tokio::test]
async fn works_with_noop_leader_elector() {
    let delegate = Arc::new(TestDelegateComponent);
    let ctx = TestComponentContext {
        delegate,
        elector: Arc::new(NoopLeaderElector),
    };

    let component = MasterComponent::default();
    let endpoint = component
        .create_endpoint("master:leader-lock:test:delegate", &ctx)
        .unwrap();
    let mut consumer = endpoint.create_consumer().unwrap();

    let (tx, mut rx) = tokio::sync::mpsc::channel(8);
    let cancel = CancellationToken::new();
    let consumer_ctx = ConsumerContext::new(tx, cancel.clone());

    consumer.start(consumer_ctx).await.unwrap();

    let first = timeout(Duration::from_millis(300), rx.recv())
        .await
        .unwrap()
        .unwrap();
    assert_eq!(first.exchange.input.body.as_text(), Some("delegate-ok"));

    cancel.cancel();
    consumer.stop().await.unwrap();
}

#[tokio::test]
async fn lock_name_maps_to_lease_name() {
    let delegate = Arc::new(TestDelegateComponent);
    let fake = Arc::new(FakeLeaderElector::new());
    let ctx = TestComponentContext {
        delegate,
        elector: fake.clone(),
    };

    let component = MasterComponent::default();
    let endpoint = component
        .create_endpoint("master:lease-name:test:delegate", &ctx)
        .unwrap();
    let mut consumer = endpoint.create_consumer().unwrap();

    let (tx, _rx) = tokio::sync::mpsc::channel(8);
    let cancel = CancellationToken::new();
    let consumer_ctx = ConsumerContext::new(tx, cancel.clone());

    consumer.start(consumer_ctx).await.unwrap();

    assert_eq!(fake.seen_node_id().as_deref(), Some("lease-name"));

    cancel.cancel();
    consumer.stop().await.unwrap();
}