1use crate::error::FlareError;
2use crate::metadata::MetadataManager;
3use crate::shard::{KvShard, ShardManager};
4use crate::NodeId;
5use flare_pb::flare_control_client::FlareControlClient;
6use flare_pb::JoinRequest;
7
8use std::error::Error;
9use std::str::FromStr;
10use std::sync::Arc;
11use tokio_stream::StreamExt;
12use tonic::transport::{Channel, Uri};
13use tracing::info;
14
15pub struct FlareNode<T>
16where
17 T: KvShard,
18{
19 pub metadata_manager: Arc<dyn MetadataManager>,
20 pub addr: String,
21 pub node_id: NodeId,
22 pub shard_manager: Arc<ShardManager<T>>,
25 close_signal_sender: tokio::sync::watch::Sender<bool>,
26 close_signal_receiver: tokio::sync::watch::Receiver<bool>,
27}
28
29impl<T> FlareNode<T>
30where
31 T: KvShard + 'static,
32{
33 pub async fn new(
34 addr: String,
35 node_id: NodeId,
36 metadata_manager: Arc<dyn MetadataManager>,
37 shard_manager: Arc<ShardManager<T>>,
38 ) -> Self {
41 let (tx, rx) = tokio::sync::watch::channel(false);
42 FlareNode {
43 metadata_manager: metadata_manager,
44 addr,
45 node_id,
46 shard_manager: shard_manager,
47 close_signal_sender: tx,
50 close_signal_receiver: rx,
51 }
52 }
53
54 pub fn start_watch_stream(self: Arc<Self>) {
55 let mut rs = tokio_stream::wrappers::WatchStream::new(
56 self.metadata_manager.create_watch(),
57 );
58 tokio::spawn(async move {
59 let mut last_sync = 0;
60 loop {
61 if let Some(log_id) = rs.next().await {
62 if log_id > last_sync {
63 last_sync = log_id;
64 let local_shards =
65 self.metadata_manager.local_shards().await;
66 self.shard_manager.sync_shards(&local_shards);
67 }
68 if self.close_signal_receiver.has_changed().unwrap_or(true)
69 {
70 info!("closed watch loop");
71 break;
72 }
73 }
74 }
75 });
76 }
77
78 pub async fn join(&self, peer_addr: &str) -> Result<(), Box<dyn Error>> {
79 info!("advertise addr {}", self.addr);
80 let peer_addr: Uri = Uri::from_str(peer_addr)?;
81 let channel = Channel::builder(peer_addr).connect_lazy();
82 let mut client = FlareControlClient::new(channel);
83 let resp = client
84 .join(JoinRequest {
85 node_id: self.node_id,
86 addr: self.addr.clone(),
87 })
88 .await?;
89 resp.into_inner();
90 Ok(())
91 }
92
93 pub async fn leave(&self) {
94 self.metadata_manager.leave().await;
95 info!("flare leave group");
96 }
97
98 pub async fn close(&self) {
99 self.leave().await;
100 self.close_signal_sender.send(true).unwrap();
101 }
102
103 pub async fn get_shard(
104 &self,
105 collection: &str,
106 key: &[u8],
107 ) -> Result<Arc<T>, FlareError> {
108 let option = self.metadata_manager.get_shard_id(collection, key).await;
109 if let Some(shard_id) = option {
110 self.shard_manager.get_any_shard(&shard_id.shard_ids)
111 } else {
112 Err(FlareError::NoCollection(collection.into()))
113 }
114 }
115}