dcs2 0.1.0

An extensible distributed control system framework made in rust with no-std support.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
use heapless::{LinearMap, Vec};
use log::*;
use serde::{Deserialize, Serialize};

use crate::communication::connection::{Address, Connection};
use crate::communication::messages::Package;
use crate::communication::messages::{
    IdentificableMessage, SimpleCodifier, MEMBERSHIP_MESSAGE_OPCODE,
};
use crate::communication::{ConnectionError, WriteError};
use crate::membership::metadata::{BasicMetadata, CommunicationMetadata, NodeMetadata};
use crate::membership::MembershipServiceError;
use crate::nodes::SystemNodeId;
use crate::properties::{init_buffer, AddressesBytesVec, MAX_NODE_COUNT};

#[allow(clippy::result_large_err)]
pub type CoordinationPackage<CoordMessage> = Package<SystemNodeId, CoordMessage>;

/// Messages exchanged between the [`MembershipClient`] and the [`MembershipManager`].
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
#[derive(Eq, PartialEq, Serialize, Deserialize, Debug, Clone)]
pub enum MembershipMessage {
    /// This is a message sent from the node to the server indicating it's still alive.
    KeepAlive(u16),
    /// This is a message sent from the node to the server instructing it to register the node
    /// with the given port sent as an u16.
    RegisterReq(u16),
    /// This is a message sent from the server to the node indicating the register request was
    /// successful
    RegisterResp,
    /// This is a message sent from the node to the server requesting for all the nodes available.
    /// The port sent should be the same sent in [`MembershipMessage::RegisterReq`] or it should be
    /// zero.
    DiscoveryReq(u16),
    /// This is a message sent from the server to the node containing the codified addreses to
    /// be discovered.
    DiscoveryResp(AddressesBytesVec),
}

impl IdentificableMessage for MembershipMessage {
    fn id() -> u8 {
        MEMBERSHIP_MESSAGE_OPCODE
    }
}

pub type MembersAddresses<Addr> = Vec<Addr, MAX_NODE_COUNT>;

/// Client designed to communicate with a Membership service. The members are identified with their
/// lower level addresses.
pub trait MembershipClient {
    type Address: Address;

    /// sends a [`crate::membership::client::MembershipMessage::KeepAlive`] message.
    fn keep_alive(&mut self);
    /// Subscribes as a new member using an implementation specific (address)[`Address`].
    fn subscribe(&mut self) -> Result<(), MembershipServiceError>;
    /// Returns a tuple with the disconnected members as first element and with all members found
    /// as seccond parameter.
    fn find_members(
        &mut self,
    ) -> (
        MembersAddresses<Self::Address>,
        MembersAddresses<Self::Address>,
    );
}

/// Generic tcp/ip implementation of a [`MembershipClient`]
pub struct BaseMembershipClient<
    MsgCod: SimpleCodifier,
    Conn: Connection,
    Addr: Address,
    AddrCod: SimpleCodifier,
> {
    registered_port: u16,
    members_addresses: MembersAddresses<Addr>,
    address_codifier: AddrCod,
    message_codifier: MsgCod,
    connection: Conn,
}

impl<
        MsgCod: SimpleCodifier<Data= MembershipMessage>,
        Conn: Connection,
        Addr: Address,
        AddrCod: SimpleCodifier<Data= MembersAddresses<Addr>>,
    > BaseMembershipClient<MsgCod, Conn, Addr, AddrCod>
{
    pub fn new(
        port: u16,
        address_codifier: AddrCod,
        message_codifier: MsgCod,
        connection: Conn,
    ) -> Self {
        Self {
            registered_port: port,
            members_addresses: Default::default(),
            address_codifier,
            message_codifier,
            connection,
        }
    }

    fn reconnect(&mut self) -> Result<(), ConnectionError> {
        self.connection.reconnect()
    }

    fn send_message(&mut self, message: &MsgCod::Data) -> Result<(), ConnectionError> {
        let mut buffer = init_buffer();
        let bytes = self
            .message_codifier
            .encode(message, &mut buffer)
            .map_err(|err| ConnectionError::WriteError(WriteError::CodificationError(err)))?;

        match self.connection.write(&buffer[..bytes]) {
            Ok(bytes) => {
                trace!("Wrote {:?} bytes to Membership Service.", bytes);
                Ok(())
            }
            Err(err) => {
                error!(
                    "Connection error while writing to the Membership Service: {:?}",
                    err
                );
                Err(err)
            }
        }
    }

    fn receive_message(&mut self) -> Option<MsgCod::Data> {
        let mut buffer = init_buffer();
        if let Ok(bytes) = self.connection.read(&mut buffer) {
            if bytes > 0 {
                return self.message_codifier.decode(&buffer[0..bytes]).ok();
            }
        }
        None
    }

    fn update_members(
        &mut self,
        current_addresses: MembersAddresses<Addr>,
    ) -> (MembersAddresses<Addr>, MembersAddresses<Addr>) {
        let mut removed_addresses = MembersAddresses::<Addr>::new();
        for old_address in self.members_addresses.iter() {
            if !current_addresses.contains(old_address) {
                info!(
                    "A stored address wasn't received from the Discovery service: {}",
                    old_address
                );
                if removed_addresses.push(*old_address).is_err() {
                    warn!("Couldn't add address {} to remove queue.", old_address);
                }
            }
        }
        self.members_addresses
            .retain(|addr| current_addresses.contains(addr));

        let mut new_addresses = MembersAddresses::<Addr>::new();
        for current_address in current_addresses {
            if !self.members_addresses.contains(&current_address)
                && new_addresses.push(current_address).is_err()
            {
                warn!("Couldn't add address {} to add queue.", current_address);
            }
        }
        if self
            .members_addresses
            .extend_from_slice(new_addresses.as_slice())
            .is_err()
        {
            error!(
                "Max capacity reached, couldn't store {} new IPs.",
                new_addresses.len()
            );
        }

        (removed_addresses, new_addresses)
    }
}

impl<
        MsgCod: SimpleCodifier<Data= MembershipMessage>,
        Conn: Connection,
        Addr: Address,
        AddrCod: SimpleCodifier<Data= MembersAddresses<Addr>>,
    > MembershipClient for BaseMembershipClient<MsgCod, Conn, Addr, AddrCod>
{
    type Address = Addr;

    fn keep_alive(&mut self) {
        debug!("Reporting as alive to Membership Service.");
        if self.reconnect().is_err() {
            warn!(
                "Couldn't reconnect to listener. Using previous connection for retrieving members."
            )
        }

        if self
            .send_message(&MembershipMessage::KeepAlive(self.registered_port))
            .is_err()
        {
            error!("Couldn't report node liveness to Membership Service.")
        }
    }

    fn subscribe(&mut self) -> Result<(), MembershipServiceError> {
        let message = MembershipMessage::RegisterReq(self.registered_port);

        match self.reconnect() {
            Ok(_) => self
                .send_message(&message)
                .map_err(MembershipServiceError::ConnectionError),
            Err(err) => {
                error!(
                    "Couldn't establish connection to Membership Service: {:?}",
                    err
                );
                Err(MembershipServiceError::ConnectionError(err))
            }
        }
    }

    fn find_members(
        &mut self,
    ) -> (
        MembersAddresses<Self::Address>,
        MembersAddresses<Self::Address>,
    ) {
        if self.reconnect().is_err() {
            warn!(
                "Couldn't reconnect to listener. Using previous connection for retrieving members."
            );
        }

        trace!("Asking for members!");
        let message = MembershipMessage::DiscoveryReq(self.registered_port);
        if self.send_message(&message).is_ok() {
            if let Some(MembershipMessage::DiscoveryResp(addresses)) = self.receive_message() {
                match self.address_codifier.decode(addresses.as_slice()) {
                    Ok(addresses) => {
                        let (removed_members, new_members) = self.update_members(addresses);

                        let mut found_members = MembersAddresses::<Self::Address>::new();
                        if found_members
                            .extend_from_slice(new_members.as_slice())
                            .is_err()
                        {
                            error!("There where {} new found members from Discovery, exceeding the max capacity.", found_members.len())
                        }

                        let mut disconnected_members = MembersAddresses::<Self::Address>::new();
                        if disconnected_members
                            .extend_from_slice(removed_members.as_slice())
                            .is_err()
                        {
                            error!("There where {} new disconnected members from Discovery, exceeding the max capacity.", disconnected_members.len())
                        }

                        return (disconnected_members, found_members);
                    }

                    Err(err) => {
                        error!("Couldn't deserialize addresses: {:?}", err);
                    }
                }
            }
        };

        warn!("Couldn't retrieve response from MembershipService. Using previous members.");
        (
            MembersAddresses::<Self::Address>::default(),
            self.members_addresses.clone(),
        )
    }
}

/// Like a [`MembershipClient`] but also identifies the nodes by their [`SystemNodeId`] using an
/// asociated `NodeMetadata`.
pub trait MembershipManager<const SIZE: usize>: MembershipClient {
    type NodeMetadata: BasicMetadata;

    /// Declares a member
    fn register_member(&mut self, node_id: SystemNodeId, metadata: Self::NodeMetadata);
    fn unregister_member(&mut self, node_id: SystemNodeId);
    fn members(&self) -> LinearMap<SystemNodeId, Self::NodeMetadata, SIZE>;
    fn select_member(&self, select: fn(&Self::NodeMetadata) -> bool) -> Option<SystemNodeId>;
}

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct NodeMembershipEntry<CoordMetadata: BasicMetadata, CommMetadata: CommunicationMetadata> {
    pub id: SystemNodeId,
    pub metadata: NodeMetadata<CoordMetadata, CommMetadata>,
}

#[cfg(test)]
mod test {
    use crate::communication::connection::{Connection, Read, Write};
    use crate::communication::messages::SimpleCodifier;
    use crate::communication::ConnectionError;
    use crate::membership::client::{
        BaseMembershipClient, MembersAddresses, MembershipClient, MembershipMessage,
    };
    use crate::CodificationError;
    use crate::properties::AddressesBytesVec;

    type FakeAddress = u16;

    #[derive(Default)]
    struct FakeConnection;

    impl Read for FakeConnection {
        fn read(&mut self, _buf: &mut [u8]) -> Result<usize, ConnectionError> {
            Ok(1)
        }
    }

    impl Write for FakeConnection {
        fn write(&mut self, _buf: &[u8]) -> Result<usize, ConnectionError> {
            Ok(1)
        }
    }

    impl Connection for FakeConnection {
        fn reconnect(&mut self) -> Result<(), ConnectionError> {
            Ok(())
        }
    }

    struct FakeMessageCodifier {
        message: MembershipMessage,
    }

    impl FakeMessageCodifier {
        pub fn with_message(message: MembershipMessage) -> Self {
            Self { message }
        }
    }

    impl Default for FakeMessageCodifier {
        fn default() -> Self {
            Self {
                message: MembershipMessage::RegisterResp,
            }
        }
    }

    impl SimpleCodifier for FakeMessageCodifier {
        type Data = MembershipMessage;

        fn decode(&self, _input: &[u8]) -> Result<Self::Data, CodificationError> {
            Ok(self.message.clone())
        }

        fn encode(
            &self,
            _msg: &Self::Data,
            _buffer: &mut [u8],
        ) -> Result<usize, CodificationError> {
            Ok(1)
        }
    }

    #[derive(Default)]
    struct FakeAddressCodifier {
        addresses: MembersAddresses<FakeAddress>,
    }

    impl FakeAddressCodifier {
        fn with_addresses(addresses: MembersAddresses<FakeAddress>) -> Self {
            Self { addresses }
        }
    }

    impl SimpleCodifier for FakeAddressCodifier {
        type Data = MembersAddresses<FakeAddress>;

        fn decode(&self, _input: &[u8]) -> Result<Self::Data, CodificationError> {
            Ok(self.addresses.clone())
        }

        fn encode(
            &self,
            _msg: &Self::Data,
            _buffer: &mut [u8],
        ) -> Result<usize, CodificationError> {
            Ok(1)
        }
    }

    #[test]
    fn membership_manager_can_get_new_members() {
        let address: u16 = 1;
        let connection = FakeConnection::default();

        let mut addresses = MembersAddresses::new();
        addresses.push(address).unwrap();

        let address_codifier = FakeAddressCodifier::with_addresses(addresses);
        let message_codifier = FakeMessageCodifier::with_message(MembershipMessage::DiscoveryResp(AddressesBytesVec::default()));

        let mut membership_client =
            BaseMembershipClient::new(0, address_codifier, message_codifier, connection);
        let (_, new_members) = membership_client.find_members();
        assert!(new_members.contains(&address))
    }
}