1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::remote::{RemoteRelay, RemoteRelayInfo};
use crate::{Context, OckamError};
use ockam_core::compat::{
    boxed::Box,
    string::{String, ToString},
    vec::Vec,
};
use ockam_core::{Any, Decodable, Result, Routed, Worker};
use tracing::{debug, info};

#[crate::worker]
impl Worker for RemoteRelay {
    type Context = Context;
    type Message = Any;

    async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
        debug!("RemoteRelay registration...");

        ctx.send_from_address(
            self.registration_route.clone(),
            self.registration_payload.clone(),
            self.addresses.main_remote.clone(),
        )
        .await?;

        Ok(())
    }

    async fn handle_message(
        &mut self,
        ctx: &mut Context,
        msg: Routed<Self::Message>,
    ) -> Result<()> {
        if msg.msg_addr() == self.addresses.heartbeat {
            // Heartbeat message, send registration message
            ctx.send_from_address(
                self.registration_route.clone(),
                self.registration_payload.clone(),
                self.addresses.main_remote.clone(),
            )
            .await?;

            if let Some(heartbeat) = &mut self.heartbeat {
                heartbeat.schedule(self.heartbeat_interval).await?;
            }

            Ok(())
        } else if msg.msg_addr() == self.addresses.main_remote {
            let return_route = msg.return_route();
            let mut message = msg.into_local_message();
            let transport_message = message.transport_mut();

            // Remove my address from the onward_route
            transport_message.onward_route.step()?;

            match transport_message.onward_route.next() {
                Err(_) => {
                    debug!("RemoteRelay received service message");

                    let payload = Vec::<u8>::decode(&transport_message.payload)
                        .map_err(|_| OckamError::InvalidHubResponse)?;
                    let payload =
                        String::from_utf8(payload).map_err(|_| OckamError::InvalidHubResponse)?;
                    // using ends_with() instead of == to allow for prefixes
                    if !payload.ends_with(&self.registration_payload) {
                        return Err(OckamError::InvalidHubResponse)?;
                    }

                    if !self.completion_msg_sent {
                        info!("RemoteRelay registered with route: {}", return_route);
                        let address = match return_route.recipient()?.to_string().strip_prefix("0#")
                        {
                            Some(addr) => addr.to_string(),
                            None => return Err(OckamError::InvalidHubResponse)?,
                        };

                        ctx.send_from_address(
                            self.addresses.completion_callback.clone(),
                            RemoteRelayInfo::new(
                                return_route,
                                address,
                                self.addresses.main_remote.clone(),
                                self.flow_control_id.clone(),
                            ),
                            self.addresses.main_remote.clone(),
                        )
                        .await?;

                        self.completion_msg_sent = true;
                    }

                    if let Some(heartbeat) = &mut self.heartbeat {
                        heartbeat.schedule(self.heartbeat_interval).await?;
                    }

                    Ok(())
                }
                Ok(next) if next == &self.addresses.main_remote => {
                    // Explicitly check that we don't forward to ourselves as this would somewhat
                    // overcome our outgoing access control, even though it shouldn't be possible
                    // to exploit it in any way
                    return Err(OckamError::UnknownForwarderNextHopAddress)?;
                }
                Ok(_) => {
                    // Forwarding the message
                    debug!("RemoteRelay received payload message");

                    // Send the message on its onward_route
                    ctx.forward_from_address(message, self.addresses.main_internal.clone())
                        .await?;

                    // We received message from the other node, our registration is still alive, let's reset
                    // heartbeat timer
                    if let Some(heartbeat) = &mut self.heartbeat {
                        heartbeat.schedule(self.heartbeat_interval).await?;
                    }

                    Ok(())
                }
            }
        } else {
            Err(OckamError::UnknownForwarderDestinationAddress)?
        }
    }
}