flare_dht/
cluster.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use crate::error::FlareError;
use crate::metadata::MetadataManager;
use crate::pool::{ControlPool, DataPool};
use crate::shard::{KvShard, ShardManager};
use crate::NodeId;
use flare_pb::flare_control_client::FlareControlClient;
use flare_pb::JoinRequest;

use std::error::Error;
use std::str::FromStr;
use std::sync::Arc;
use tokio_stream::StreamExt;
use tonic::transport::{Channel, Uri};
use tracing::info;

pub struct FlareNode<T>
where
    T: KvShard,
{
    pub metadata_manager: Arc<dyn MetadataManager>,
    pub addr: String,
    pub node_id: NodeId,
    pub control_pool: Arc<ControlPool>,
    pub data_pool: Arc<DataPool>,
    pub shard_manager: Arc<ShardManager<T>>,
    close_signal_sender: tokio::sync::watch::Sender<bool>,
    close_signal_receiver: tokio::sync::watch::Receiver<bool>,
}

impl<T> FlareNode<T>
where
    T: KvShard + 'static,
{
    pub async fn new(
        addr: String,
        node_id: NodeId,
        metadata_manager: Arc<dyn MetadataManager>,
        shard_manager: Arc<ShardManager<T>>,
        control_pool: Arc<ControlPool>,
        data_pool: Arc<DataPool>,
    ) -> Self {
        let (tx, rx) = tokio::sync::watch::channel(false);
        FlareNode {
            metadata_manager: metadata_manager,
            addr,
            node_id,
            shard_manager: shard_manager,
            control_pool,
            data_pool,
            close_signal_sender: tx,
            close_signal_receiver: rx,
        }
    }

    pub fn start_watch_stream(self: Arc<Self>) {
        let mut rs = tokio_stream::wrappers::WatchStream::new(
            self.metadata_manager.create_watch(),
        );
        tokio::spawn(async move {
            let mut last_sync = 0;
            loop {
                if let Some(log_id) = rs.next().await {
                    if log_id > last_sync {
                        last_sync = log_id;
                        let local_shards =
                            self.metadata_manager.local_shards().await;
                        self.shard_manager.sync_shards(&local_shards);
                    }
                    if self.close_signal_receiver.has_changed().unwrap_or(true)
                    {
                        info!("closed watch loop");
                        break;
                    }
                }
            }
        });
    }

    pub async fn join(&self, peer_addr: &str) -> Result<(), Box<dyn Error>> {
        info!("advertise addr {}", self.addr);
        let peer_addr: Uri = Uri::from_str(peer_addr)?;
        let channel = Channel::builder(peer_addr).connect_lazy();
        let mut client = FlareControlClient::new(channel);
        let resp = client
            .join(JoinRequest {
                node_id: self.node_id,
                addr: self.addr.clone(),
            })
            .await?;
        resp.into_inner();
        Ok(())
    }

    pub async fn leave(&self) {
        self.metadata_manager.leave().await;
        info!("flare leave group");
    }

    pub async fn close(&self) {
        self.leave().await;
        self.close_signal_sender.send(true).unwrap();
    }

    pub async fn get_shard(
        &self,
        collection: &str,
        key: &[u8],
    ) -> Result<Arc<T>, FlareError> {
        let option = self.metadata_manager.get_shard_id(collection, key).await;
        if let Some(shard_id) = option {
            self.shard_manager.get_shard(shard_id)
        } else {
            Err(FlareError::NoCollection(collection.into()))
        }
    }
}