1use flare_dht::cli::{
2 CollectionOperation, FlareCli, FlareCommands, ServerArgs,
3};
4use flare_dht::metadata::raft::FlareMetadataManager;
5use flare_dht::metadata::MetadataManager;
6use flare_dht::proto::flare_control_server::FlareControlServer;
7use flare_dht::proto::flare_kv_client::FlareKvClient;
8use flare_dht::proto::flare_kv_server::FlareKvServer;
9use flare_dht::proto::CreateCollectionRequest;
10use flare_dht::rpc_server::control_api::FlareControlService;
11use flare_dht::rpc_server::kv_api::FlareKvService;
12use flare_dht::shard::{HashMapShard, HashMapShardFactory, ShardManager};
13use flare_dht::FlareNode;
14use std::error::Error;
15use std::net::{IpAddr, Ipv4Addr, SocketAddr};
16use std::sync::Arc;
17use tonic::transport::Server;
18use tonic::Request;
19use tracing::info;
20
21pub async fn start_server(
22 options: ServerArgs,
23) -> Result<Arc<FlareNode<HashMapShard>>, Box<dyn Error>> {
24 info!("use option {options:?}");
25
26 let node_id = options.get_node_id();
27 info!("use node_id: {node_id}");
28
29 let z_session = zenoh::open(zenoh::Config::default()).await.unwrap();
30 let prefix = format!("flare/{}/nodes", options.cluster_id);
31
32 let metadata_manager: Arc<FlareMetadataManager> = Arc::new(
33 FlareMetadataManager::new(
34 node_id,
35 options.get_addr(),
36 options.clone(),
37 z_session.clone(),
38 &prefix,
39 )
40 .await,
41 );
42 metadata_manager.initialize().await?;
43 let shard_manager =
44 Arc::new(ShardManager::new(Box::new(HashMapShardFactory {})));
45 let flare_node = FlareNode::new(
46 options.get_addr(),
47 node_id,
48 metadata_manager.clone(),
49 shard_manager,
50 )
53 .await;
54
55 let shared_node = Arc::new(flare_node);
56 let flare_node = shared_node.clone();
57 flare_node.start_watch_stream();
58 let flare_kv = FlareKvService::new(shared_node.clone());
59 let flare_control = FlareControlService::new(metadata_manager.clone());
60
61 if !options.not_server {
63 let socket = SocketAddr::new(
64 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
65 options.port,
66 );
67 info!("start on {}", socket);
68 let reflection_server_v1a =
69 tonic_reflection::server::Builder::configure()
70 .register_encoded_file_descriptor_set(
71 flare_dht::proto::FILE_DESCRIPTOR_SET,
72 )
73 .build_v1alpha()
74 .unwrap();
75
76 let reflection_server_v1 =
77 tonic_reflection::server::Builder::configure()
78 .register_encoded_file_descriptor_set(
79 flare_dht::proto::FILE_DESCRIPTOR_SET,
80 )
81 .build_v1()
82 .unwrap();
83
84 if let Some(addr) = options.peer_addr {
85 let flare_node = shared_node.clone();
86 tokio::spawn(async move {
87 let node = flare_node.clone();
88 node.join(&addr).await.unwrap()
89 });
90 };
91
92 tokio::spawn(async move {
93 Server::builder()
94 .add_service(reflection_server_v1a)
95 .add_service(reflection_server_v1)
96 .add_service(FlareKvServer::new(flare_kv))
97 .add_service(FlareControlServer::new(flare_control))
98 .serve(socket)
99 .await
100 .unwrap();
101 });
102 }
103
104 Ok(shared_node)
105}
106
107pub async fn handle_cli(command: FlareCli) -> Result<(), Box<dyn Error>> {
108 match command.command {
109 FlareCommands::Server(server_args) => handle_server(server_args).await,
110 FlareCommands::Collection { opt } => handle_collection(opt).await,
111 }
112}
113
114async fn handle_server(server_args: ServerArgs) -> Result<(), Box<dyn Error>> {
115 let flare_node = start_server(server_args).await?;
116
117 match tokio::signal::ctrl_c().await {
118 Ok(()) => {}
119 Err(err) => {
120 eprintln!("Unable to listen for shutdown signal: {}", err);
121 }
123 }
124 info!("starting a clean up for shutdown");
125 flare_node.leave().await;
126 info!("done clean up");
127 Ok(())
128}
129
130async fn handle_collection(
131 opt: CollectionOperation,
132) -> Result<(), Box<dyn Error>> {
133 info!("collection {:?}", opt);
134 match opt {
135 CollectionOperation::Create {
136 name,
137 shard_count: partitions,
138 connection,
139 } => {
140 let mut client =
141 FlareKvClient::connect(connection.server_url).await?;
142 let resp = client
143 .create_collection(Request::new(CreateCollectionRequest {
144 partition_count: partitions as i32,
145 name: name,
146 ..Default::default()
147 }))
148 .await?;
149 info!("RESP: {:?}\n", resp);
150 }
151 }
152 Ok(())
153}