1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use crate::{pipe2::PipeSystem, Context, OckamError, OckamMessage};
use ockam_core::{
    compat::boxed::Box, Address, Any, Decodable, LocalMessage, Result, Route, Routed,
    TransportMessage, Worker,
};

#[allow(unused)]
pub struct PipeReceiver {
    system: PipeSystem,
    fin_addr: Address,
    init_addr: Option<Address>,
}

#[crate::worker]
impl Worker for PipeReceiver {
    type Context = Context;
    type Message = OckamMessage;

    async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
        ctx.set_cluster(crate::pipe2::CLUSTER_NAME).await?;
        if self.init_addr.is_some() {
            debug!(
                "PipeReceiver '{}' waiting for initialisation message",
                ctx.address()
            );
        }
        Ok(())
    }

    async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<OckamMessage>) -> Result<()> {
        debug!(
            "PipeReceiver: received message to address: {}",
            msg.msg_addr()
        );

        match (msg.msg_addr(), &self.init_addr) {
            (ref addr, Some(ref init)) if addr == init => {
                let peer_route = match msg.body().scope.get(0) {
                    Some(data) => Route::decode(data)?,
                    None => return Err(OckamError::InvalidParameter.into()),
                };
                trace!("Successfully initialised PipeReceiver!");
                ctx.send(peer_route, OckamMessage::new(Any)?).await
            }

            // Messagess addressed to the "fin" address can simply be forwarded
            (addr, _) if addr == self.fin_addr => {
                let inner: TransportMessage = msg.body().data()?;
                ctx.forward(LocalMessage::new(inner, vec![])).await
            }

            // For any other address we pass the message to the worker system
            (addr, _) => {
                // If the system is empty we can skip right to "fin"
                if self.system.is_empty() {
                    let inner: TransportMessage = msg.body().data()?;
                    ctx.forward(LocalMessage::new(inner, vec![])).await
                }
                // Otherwise we submit to the system
                else if addr == ctx.address() {
                    trace!(
                        "Initial dispatch to worker system: {:?}",
                        self.system.entrypoint()
                    );
                    if let Err(e) = self.system.dispatch_entry(ctx, msg).await {
                        error!("Dispatch entry error: {}", e);
                        return Err(e);
                    }
                    Ok(())
                } else {
                    trace!("Forwarding message to worker system: {}", addr);
                    self.system.handle_message(ctx, msg).await
                }
            }
        }
    }
}

impl PipeReceiver {
    pub fn new(system: PipeSystem, fin_addr: Address, init_addr: Option<Address>) -> Self {
        Self {
            system,
            fin_addr,
            init_addr,
        }
    }
}