flare_dht/
cluster.rs

1use crate::error::FlareError;
2use crate::metadata::MetadataManager;
3use crate::shard::{KvShard, ShardManager};
4use crate::NodeId;
5use flare_pb::flare_control_client::FlareControlClient;
6use flare_pb::JoinRequest;
7
8use std::error::Error;
9use std::str::FromStr;
10use std::sync::Arc;
11use tokio_stream::StreamExt;
12use tonic::transport::{Channel, Uri};
13use tracing::info;
14
15pub struct FlareNode<T>
16where
17    T: KvShard,
18{
19    pub metadata_manager: Arc<dyn MetadataManager>,
20    pub addr: String,
21    pub node_id: NodeId,
22    // pub control_pool: Arc<ControlPool>,
23    // pub data_pool: Arc<DataPool>,
24    pub shard_manager: Arc<ShardManager<T>>,
25    close_signal_sender: tokio::sync::watch::Sender<bool>,
26    close_signal_receiver: tokio::sync::watch::Receiver<bool>,
27}
28
29impl<T> FlareNode<T>
30where
31    T: KvShard + 'static,
32{
33    pub async fn new(
34        addr: String,
35        node_id: NodeId,
36        metadata_manager: Arc<dyn MetadataManager>,
37        shard_manager: Arc<ShardManager<T>>,
38        // control_pool: Arc<ControlPool>,
39        // data_pool: Arc<DataPool>,
40    ) -> Self {
41        let (tx, rx) = tokio::sync::watch::channel(false);
42        FlareNode {
43            metadata_manager: metadata_manager,
44            addr,
45            node_id,
46            shard_manager: shard_manager,
47            // control_pool,
48            // data_pool,
49            close_signal_sender: tx,
50            close_signal_receiver: rx,
51        }
52    }
53
54    pub fn start_watch_stream(self: Arc<Self>) {
55        let mut rs = tokio_stream::wrappers::WatchStream::new(
56            self.metadata_manager.create_watch(),
57        );
58        tokio::spawn(async move {
59            let mut last_sync = 0;
60            loop {
61                if let Some(log_id) = rs.next().await {
62                    if log_id > last_sync {
63                        last_sync = log_id;
64                        let local_shards =
65                            self.metadata_manager.local_shards().await;
66                        self.shard_manager.sync_shards(&local_shards);
67                    }
68                    if self.close_signal_receiver.has_changed().unwrap_or(true)
69                    {
70                        info!("closed watch loop");
71                        break;
72                    }
73                }
74            }
75        });
76    }
77
78    pub async fn join(&self, peer_addr: &str) -> Result<(), Box<dyn Error>> {
79        info!("advertise addr {}", self.addr);
80        let peer_addr: Uri = Uri::from_str(peer_addr)?;
81        let channel = Channel::builder(peer_addr).connect_lazy();
82        let mut client = FlareControlClient::new(channel);
83        let resp = client
84            .join(JoinRequest {
85                node_id: self.node_id,
86                addr: self.addr.clone(),
87            })
88            .await?;
89        resp.into_inner();
90        Ok(())
91    }
92
93    pub async fn leave(&self) {
94        self.metadata_manager.leave().await;
95        info!("flare leave group");
96    }
97
98    pub async fn close(&self) {
99        self.leave().await;
100        self.close_signal_sender.send(true).unwrap();
101    }
102
103    pub async fn get_shard(
104        &self,
105        collection: &str,
106        key: &[u8],
107    ) -> Result<Arc<T>, FlareError> {
108        let option = self.metadata_manager.get_shard_id(collection, key).await;
109        if let Some(shard_id) = option {
110            self.shard_manager.get_any_shard(&shard_id.shard_ids)
111        } else {
112            Err(FlareError::NoCollection(collection.into()))
113        }
114    }
115}