ockam_entity 0.35.0

Ockam is a library for building devices that communicate securely, privately and trustfully with cloud services and other devices.
Documentation
use crate::ProfileIdentifier;
use ockam_core::{async_trait, compat::boxed::Box};
use ockam_core::{AccessControl, LocalMessage, Result};

mod secure_channel_worker;
pub(crate) use secure_channel_worker::*;
mod listener;
pub(crate) use listener::*;
mod messages;
pub(crate) use messages::*;
mod trust_policy;
pub use trust_policy::*;
mod local_info;
pub use local_info::*;

pub struct EntityAccessControlBuilder;

impl EntityAccessControlBuilder {
    pub fn new_with_id(their_profile_id: ProfileIdentifier) -> EntityIdAccessControl {
        EntityIdAccessControl { their_profile_id }
    }

    pub fn new_with_any_id() -> EntityAnyIdAccessControl {
        EntityAnyIdAccessControl
    }
}

pub struct EntityAnyIdAccessControl;

#[async_trait]
impl AccessControl for EntityAnyIdAccessControl {
    async fn msg_is_authorized(&mut self, local_msg: &LocalMessage) -> Result<bool> {
        Ok(EntitySecureChannelLocalInfo::find_info(local_msg).is_ok())
    }
}

pub struct EntityIdAccessControl {
    their_profile_id: ProfileIdentifier,
}

#[async_trait]
impl AccessControl for EntityIdAccessControl {
    async fn msg_is_authorized(&mut self, local_msg: &LocalMessage) -> Result<bool> {
        if let Ok(msg_profile_id) = EntitySecureChannelLocalInfo::find_info(local_msg) {
            Ok(msg_profile_id.their_profile_id() == &self.their_profile_id)
        } else {
            Ok(false)
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::{Entity, Identity};
    use core::sync::atomic::{AtomicU8, Ordering};
    use ockam_core::compat::sync::Arc;
    use ockam_core::{route, Any, Route, Routed, Worker};
    use ockam_node::Context;
    use ockam_vault_sync_core::Vault;
    use std::convert::TryInto;
    use std::time::Duration;
    use tokio::time::sleep;

    #[ockam_macros::test]
    async fn test_channel(ctx: &mut Context) -> Result<()> {
        let alice_vault = Vault::create(ctx).await.expect("failed to create vault");
        let bob_vault = Vault::create(ctx).await.expect("failed to create vault");

        let mut alice = Entity::create(ctx, &alice_vault).await?;
        let mut bob = Entity::create(ctx, &bob_vault).await?;

        let alice_trust_policy = TrustIdentifierPolicy::new(bob.identifier().await?);
        let bob_trust_policy = TrustIdentifierPolicy::new(alice.identifier().await?);

        bob.create_secure_channel_listener("bob_listener", bob_trust_policy)
            .await?;

        let alice_channel = alice
            .create_secure_channel(route!["bob_listener"], alice_trust_policy)
            .await?;

        ctx.send(
            route![alice_channel, ctx.address()],
            "Hello, Bob!".to_string(),
        )
        .await?;
        let msg = ctx.receive::<String>().await?.take();

        let local_info = EntitySecureChannelLocalInfo::find_info(msg.local_message())?;
        assert_eq!(local_info.their_profile_id(), &alice.identifier().await?);

        let return_route = msg.return_route();
        assert_eq!("Hello, Bob!", msg.body());

        ctx.send(return_route, "Hello, Alice!".to_string()).await?;

        let msg = ctx.receive::<String>().await?.take();

        let local_info = EntitySecureChannelLocalInfo::find_info(msg.local_message())?;
        assert_eq!(local_info.their_profile_id(), &bob.identifier().await?);

        assert_eq!("Hello, Alice!", msg.body());

        ctx.stop().await
    }

    #[ockam_macros::test]
    async fn test_tunneled_secure_channel_works(ctx: &mut Context) -> Result<()> {
        let vault = Vault::create(ctx).await?;

        let mut alice = Entity::create(ctx, &vault).await?;
        let mut bob = Entity::create(ctx, &vault).await?;

        let alice_trust_policy = TrustIdentifierPolicy::new(bob.identifier().await?);
        let bob_trust_policy = TrustIdentifierPolicy::new(alice.identifier().await?);

        bob.create_secure_channel_listener("bob_listener", bob_trust_policy.clone())
            .await?;

        let alice_channel = alice
            .create_secure_channel(route!["bob_listener"], alice_trust_policy.clone())
            .await?;

        bob.create_secure_channel_listener("bob_another_listener", bob_trust_policy)
            .await?;

        let alice_another_channel = alice
            .create_secure_channel(
                route![alice_channel, "bob_another_listener"],
                alice_trust_policy,
            )
            .await?;

        ctx.send(
            route![alice_another_channel, ctx.address()],
            "Hello, Bob!".to_string(),
        )
        .await?;
        let msg = ctx.receive::<String>().await?.take();
        let return_route = msg.return_route();
        assert_eq!("Hello, Bob!", msg.body());

        ctx.send(return_route, "Hello, Alice!".to_string()).await?;
        assert_eq!(
            "Hello, Alice!",
            ctx.receive::<String>().await?.take().body()
        );

        ctx.stop().await
    }

    #[ockam_macros::test]
    async fn test_double_tunneled_secure_channel_works(ctx: &mut Context) -> Result<()> {
        let vault = Vault::create(ctx).await?;

        let mut alice = Entity::create(ctx, &vault).await?;
        let mut bob = Entity::create(ctx, &vault).await?;

        let alice_trust_policy = TrustIdentifierPolicy::new(bob.identifier().await?);
        let bob_trust_policy = TrustIdentifierPolicy::new(alice.identifier().await?);

        bob.create_secure_channel_listener("bob_listener", bob_trust_policy.clone())
            .await?;

        let alice_channel = alice
            .create_secure_channel(route!["bob_listener"], alice_trust_policy.clone())
            .await?;

        bob.create_secure_channel_listener("bob_another_listener", bob_trust_policy.clone())
            .await?;

        let alice_another_channel = alice
            .create_secure_channel(
                route![alice_channel, "bob_another_listener"],
                alice_trust_policy.clone(),
            )
            .await?;

        bob.create_secure_channel_listener("bob_yet_another_listener", bob_trust_policy)
            .await?;

        let alice_yet_another_channel = alice
            .create_secure_channel(
                route![alice_another_channel, "bob_yet_another_listener"],
                alice_trust_policy,
            )
            .await?;

        ctx.send(
            route![alice_yet_another_channel, ctx.address()],
            "Hello, Bob!".to_string(),
        )
        .await?;
        let msg = ctx.receive::<String>().await?.take();
        let return_route = msg.return_route();
        assert_eq!("Hello, Bob!", msg.body());

        ctx.send(return_route, "Hello, Alice!".to_string()).await?;
        assert_eq!(
            "Hello, Alice!",
            ctx.receive::<String>().await?.take().body()
        );

        ctx.stop().await
    }

    #[ockam_macros::test]
    async fn test_many_times_tunneled_secure_channel_works(ctx: &mut Context) -> Result<()> {
        let vault = Vault::create(ctx).await?;

        let mut alice = Entity::create(ctx, &vault).await?;
        let mut bob = Entity::create(ctx, &vault).await?;

        let alice_trust_policy = TrustIdentifierPolicy::new(bob.identifier().await?);
        let bob_trust_policy = TrustIdentifierPolicy::new(alice.identifier().await?);

        let n = rand::random::<u8>() % 5 + 4;
        let mut channels = vec![];
        for i in 0..n {
            bob.create_secure_channel_listener(i.to_string(), bob_trust_policy.clone())
                .await?;
            let channel_route: Route;
            if i > 0 {
                channel_route = route![channels.pop().unwrap(), i.to_string()];
            } else {
                channel_route = route![i.to_string()];
            }
            let alice_channel = alice
                .create_secure_channel(channel_route, alice_trust_policy.clone())
                .await?;
            channels.push(alice_channel);
        }

        ctx.send(
            route![channels.pop().unwrap(), ctx.address()],
            "Hello, Bob!".to_string(),
        )
        .await?;
        let msg = ctx.receive::<String>().await?.take();
        let return_route = msg.return_route();
        assert_eq!("Hello, Bob!", msg.body());

        ctx.send(return_route, "Hello, Alice!".to_string()).await?;
        assert_eq!(
            "Hello, Alice!",
            ctx.receive::<String>().await?.take().body()
        );

        ctx.stop().await
    }

    struct Receiver {
        received_count: Arc<AtomicU8>,
    }

    #[ockam_core::async_trait]
    impl Worker for Receiver {
        type Message = Any;
        type Context = Context;

        async fn handle_message(
            &mut self,
            _context: &mut Self::Context,
            _msg: Routed<Self::Message>,
        ) -> Result<()> {
            self.received_count.fetch_add(1, Ordering::Relaxed);

            Ok(())
        }
    }

    #[allow(non_snake_case)]
    #[ockam_macros::test]
    async fn access_control__known_participant__should_pass_messages(
        ctx: &mut Context,
    ) -> Result<()> {
        let received_count = Arc::new(AtomicU8::new(0));
        let receiver = Receiver {
            received_count: received_count.clone(),
        };

        let vault = Vault::create(ctx).await?;

        let mut alice = Entity::create(ctx, &vault).await?;
        let mut bob = Entity::create(ctx, &vault).await?;

        let access_control = EntityAccessControlBuilder::new_with_id(alice.identifier().await?);
        ctx.start_worker_with_access_control("receiver", receiver, access_control)
            .await?;

        bob.create_secure_channel_listener("listener", TrustEveryonePolicy)
            .await?;

        let alice_channel = alice
            .create_secure_channel("listener", TrustEveryonePolicy)
            .await?;

        ctx.send(route![alice_channel, "receiver"], "Hello, Bob!".to_string())
            .await?;

        sleep(Duration::from_secs(1)).await;

        assert_eq!(received_count.load(Ordering::Relaxed), 1);

        ctx.stop().await
    }

    #[allow(non_snake_case)]
    #[ockam_macros::test]
    async fn access_control__unknown_participant__should_not_pass_messages(
        ctx: &mut Context,
    ) -> Result<()> {
        let received_count = Arc::new(AtomicU8::new(0));
        let receiver = Receiver {
            received_count: received_count.clone(),
        };

        let vault = Vault::create(ctx).await?;

        let mut alice = Entity::create(ctx, &vault).await?;
        let mut bob = Entity::create(ctx, &vault).await?;

        let access_control = EntityAccessControlBuilder::new_with_id(bob.identifier().await?);
        ctx.start_worker_with_access_control("receiver", receiver, access_control)
            .await?;

        bob.create_secure_channel_listener("listener", TrustEveryonePolicy)
            .await?;

        let alice_channel = alice
            .create_secure_channel("listener", TrustEveryonePolicy)
            .await?;

        ctx.send(route![alice_channel, "receiver"], "Hello, Bob!".to_string())
            .await?;

        sleep(Duration::from_secs(1)).await;

        assert_eq!(received_count.load(Ordering::Relaxed), 0);

        ctx.stop().await
    }

    #[allow(non_snake_case)]
    #[ockam_macros::test]
    async fn access_control__no_secure_channel__should_not_pass_messages(
        ctx: &mut Context,
    ) -> Result<()> {
        let received_count = Arc::new(AtomicU8::new(0));
        let receiver = Receiver {
            received_count: received_count.clone(),
        };

        let access_control = EntityAccessControlBuilder::new_with_id(
            "P79b26ba2ea5ad9b54abe5bebbcce7c446beda8c948afc0de293250090e5270b6".try_into()?,
        );
        ctx.start_worker_with_access_control("receiver", receiver, access_control)
            .await?;

        ctx.send(route!["receiver"], "Hello, Bob!".to_string())
            .await?;

        sleep(Duration::from_secs(1)).await;

        assert_eq!(received_count.load(Ordering::Relaxed), 0);

        ctx.stop().await
    }
}