dcs2/nodes/
utils.rs

1use log::info;
2
3use crate::communication::messages::{IdentificableMessage, SimpleCodifier, Messages, ControlMessage};
4use crate::communication::service::GenericCommunicationService;
5use crate::membership::client::{MembershipClient, MembershipManager};
6use crate::membership::metadata::{BasicMetadata, NodeMetadata};
7use crate::nodes::CommunicationSDK;
8use crate::properties::{EncodedMetadata, init_buffer, SystemBufferVec};
9
10pub type SystemCommunicationService<CommsSDK> = GenericCommunicationService<
11    <CommsSDK as CommunicationSDK>::Connection,
12    <CommsSDK as CommunicationSDK>::Address,
13    <CommsSDK as CommunicationSDK>::Factory,
14    <CommsSDK as CommunicationSDK>::Router,
15>;
16
17pub fn connect_with_members<
18    Msg: IdentificableMessage,
19    CommsSDK: CommunicationSDK,
20    CoordMetadata: BasicMetadata,
21    Membership: MembershipManager<SIZE, NodeMetadata = NodeMetadata<CoordMetadata, CommsSDK::Metadata>>,
22    MsgCod: SimpleCodifier<Data= Messages<Msg>>,
23    const SIZE: usize,
24>(
25    metadata: EncodedMetadata,
26    membership: &mut Membership,
27    communication: &mut SystemCommunicationService<CommsSDK>,
28    codifier: &mut MsgCod,
29) where
30    Membership: MembershipClient<Address = CommsSDK::Address>,
31{
32    info!("Connecting to network members.");
33    let message: Messages<Msg> = Messages::System(ControlMessage::GetMetadata(metadata));
34
35    let (removed_members, new_members) = membership.find_members();
36    info!(
37        "There are {} new members and {} removed members in the network.",
38        new_members.len(),
39        removed_members.len()
40    );
41
42    for removed_member in removed_members {
43        if let Some(old_id) = communication.unpeer(removed_member) {
44            membership.unregister_member(old_id);
45        }
46    }
47
48    let mut buffer = init_buffer();
49    let codified = codifier.encode(&message, &mut buffer).unwrap();
50
51    if let Ok(bytes) = SystemBufferVec::from_slice(&buffer[..codified]) {
52        for new_member in new_members {
53            info!("Establishing handshake with node at {}.", new_member);
54            communication.handshake(new_member, bytes.clone())
55        }
56    }
57}