1pub mod durable_commit_service;
2pub mod durable_message;
3pub mod edge;
4pub mod raft;
5use std::{
6 collections::{BTreeMap, HashMap},
7 net::SocketAddr,
8 ops::Deref,
9 sync::{self, Arc, RwLock},
10 time::Duration,
11};
12
13use durable_message::DurableMessageQuery;
14
15pub use asteroid_mq_model::NodeId;
16use asteroid_mq_model::{
17 codec::BINCODE_CONFIG, connection::EdgeNodeConnection, EndpointAddr, Message, MessageId,
18 TopicCode,
19};
20use durable_commit_service::DurableCommitService;
21use edge::{
22 auth::EdgeAuthService,
23 connection::{
24 ConnectionConfig, EdgeConnectionError, EdgeConnectionInstance, EdgeConnectionRef,
25 },
26 middleware::EdgeConnectionHandlerObject,
27 EdgeConfig, EdgeResult,
28};
29use edge::{packet::Auth, EdgeError, EdgeErrorKind};
30use futures_util::TryFutureExt;
31use openraft::Raft;
32use raft::{
33 cluster::{ClusterProvider, ClusterService},
34 log_storage::LogStorage,
35 network_factory::TcpNetworkService,
36 proposal::{DelegateMessage, EndpointOffline, EndpointOnline, LoadTopic, Proposal},
37 raft_node::TcpNode,
38 state_machine::{
39 node::NodeData,
40 topic::{
41 config::TopicConfig,
42 wait_ack::{AckSender, WaitAckHandle},
43 },
44 StateMachineStore,
45 },
46 MaybeLoadingRaft, TypeConfig,
47};
48use serde::{Deserialize, Serialize};
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52 prelude::{DurableMessage, DurableService},
53 DEFAULT_TCP_SOCKET_ADDR,
54};
55
56#[derive(Debug, Clone)]
57pub struct NodeConfig {
58 pub id: NodeId,
59 pub addr: SocketAddr,
60 pub raft: openraft::Config,
61 pub durable: Option<DurableService>,
62 pub edge_auth: Option<EdgeAuthService>,
63}
64
65impl Default for NodeConfig {
66 fn default() -> Self {
67 Self {
68 id: NodeId::default(),
69 addr: DEFAULT_TCP_SOCKET_ADDR,
70 raft: openraft::Config::default(),
71 durable: None,
72 edge_auth: None,
73 }
74 }
75}
76
77#[derive(Debug)]
78pub struct NodeInner {
79 raft: MaybeLoadingRaft,
80 network: TcpNetworkService,
81 config: NodeConfig,
82 edge_connections: RwLock<HashMap<NodeId, Arc<EdgeConnectionInstance>>>,
83 edge_routing: tokio::sync::RwLock<HashMap<EndpointAddr, (NodeId, TopicCode)>>,
84 pub edge_handler: tokio::sync::RwLock<EdgeConnectionHandlerObject>,
85
86 pub(crate) ack_waiting_pool: Arc<tokio::sync::RwLock<HashMap<MessageId, AckSender>>>,
91
92 pub(crate) durable_commit_service: Option<DurableCommitService>,
94 pub(crate) scheduler: tsuki_scheduler::AsyncSchedulerClient<tsuki_scheduler::runtime::Tokio>,
96 ct: CancellationToken,
97}
98
99impl NodeInner {
100 pub async fn shutdown(&self) {
101 if let Some(raft) = self.raft.get_opt() {
102 let result = raft.shutdown().await;
103 if let Err(e) = result {
104 tracing::error!(?e, "raft shutdown error");
105 }
106 }
107 self.ct.cancel();
108 }
109}
110#[derive(Debug, Clone, Default)]
111pub struct NodeRef {
112 inner: std::sync::Weak<NodeInner>,
113}
114
115impl NodeRef {
116 pub(crate) fn upgrade(&self) -> Option<Node> {
117 self.inner.upgrade().map(|inner| Node { inner })
118 }
119}
120
121#[derive(Clone)]
122pub struct Node {
123 pub(crate) inner: Arc<NodeInner>,
124}
125
126impl std::fmt::Debug for Node {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("Node")
129 .field("id", &self.id())
130 .field("config", &self.config())
131 .finish()
132 }
133}
134
135impl Deref for Node {
136 type Target = NodeInner;
137 fn deref(&self) -> &Self::Target {
138 &self.inner
139 }
140}
141
142impl Node {
143 pub async fn raft(&self) -> Raft<TypeConfig> {
145 self.raft.get().await
146 }
147 pub fn config(&self) -> &NodeConfig {
149 &self.config
150 }
151 pub fn new(config: NodeConfig) -> Self {
153 let ct = CancellationToken::new();
154 let raft = MaybeLoadingRaft::new();
155 let network = raft.net_work_service(
156 config.id,
157 TcpNode::new(config.addr.to_string()),
158 ct.child_token(),
159 );
160 let scheduler_runner = tsuki_scheduler::AsyncSchedulerRunner::tokio()
161 .with_execute_duration(Duration::from_secs(1));
162 let scheduler_client = scheduler_runner.client();
163 let scheduler_running =
164 scheduler_runner.run_with_shutdown_signal(Box::pin(ct.child_token().cancelled_owned()));
165 tokio::spawn(scheduler_running);
166 let inner = NodeInner {
167 edge_connections: RwLock::new(HashMap::new()),
168 edge_routing: tokio::sync::RwLock::new(HashMap::new()),
169 edge_handler: tokio::sync::RwLock::new(EdgeConnectionHandlerObject::basic()),
171 durable_commit_service: config
172 .durable
173 .as_ref()
174 .map(|ds| DurableCommitService::new(raft.clone(), ds.clone(), ct.child_token())),
175 config,
176 raft,
177 network,
178 scheduler: scheduler_client,
179 ack_waiting_pool: Default::default(),
180 ct,
181 };
182 Self {
183 inner: Arc::new(inner),
184 }
185 }
186 pub async fn start<C: ClusterProvider>(
188 &self,
189 mut cluster_provider: C,
190 ) -> Result<(), crate::Error> {
191 if self.raft_opt().is_some() {
192 return Ok(());
193 }
194 let cluster_service_ct = self.ct.child_token();
195 let node_ref = self.node_ref();
196 let id = self.id();
197 let maybe_loading_raft = self.raft.clone();
198 let tcp_service = self.network.clone();
199 let state_machine_store = StateMachineStore::new(node_ref);
200 let raft_config = self
201 .config
202 .raft
203 .clone()
204 .validate()
205 .map_err(crate::Error::contextual_custom("validate raft config"))?;
206 tcp_service.run_service();
207 if let Some(durable_commit_service) = &self.durable_commit_service {
209 durable_commit_service.clone().spawn()
210 }
211 let raft = Raft::<TypeConfig>::new(
212 id,
213 Arc::new(raft_config.clone()),
214 tcp_service.clone(),
215 LogStorage::default(),
216 Arc::new(state_machine_store),
217 )
218 .await
219 .map_err(crate::Error::contextual_custom("create raft node"))?;
220 let pristine_nodes = cluster_provider.pristine_nodes().await?;
222 tracing::info!(?pristine_nodes);
223
224 if pristine_nodes.contains_key(&id) {
225 loop {
226 let mut peer_nodes = pristine_nodes.clone();
227 peer_nodes.remove(&id);
228 let mut ensured_nodes = BTreeMap::new();
229 for (peer_id, peer_addr) in peer_nodes.iter() {
230 if ensured_nodes.contains_key(peer_id) {
231 continue;
232 }
233 let ensure_result = tcp_service
234 .ensure_connection(*peer_id, peer_addr.clone())
235 .await;
236 if let Err(e) = ensure_result {
237 tracing::warn!("failed to ensure connection to {}: {}", peer_id, e);
238 } else {
239 ensured_nodes.insert(*peer_id, peer_addr.clone());
240 tracing::trace!("connection to {} ensured", peer_id);
241 }
242 }
243 if ensured_nodes.len() == peer_nodes.len() {
244 break;
245 } else {
246 tokio::time::sleep(Duration::from_millis(raft_config.heartbeat_interval)).await;
247 }
248 }
249 maybe_loading_raft.set(raft.clone());
250 raft.initialize(
251 pristine_nodes
252 .into_iter()
253 .map(|(k, n)| (k, TcpNode::new(n)))
254 .collect::<BTreeMap<_, _>>(),
255 )
256 .await
257 .map_err(crate::Error::contextual_custom("init raft node"))?;
258
259 raft.wait(None)
261 .metrics(
262 |rm| rm.current_leader.is_some(),
263 "wait for leader to be elected",
264 )
265 .await
266 .map_err(crate::Error::contextual_custom("wait for leader"))?;
267 } else {
268 return Err(crate::Error::unknown(format!(
269 "{id} not in cluster: {pristine_nodes:?}"
270 )));
271 }
272 let cluster_service =
273 ClusterService::new(cluster_provider, tcp_service, cluster_service_ct);
274 cluster_service.spawn();
275 tracing::info!("node started");
276 Ok(())
277 }
278
279 #[tracing::instrument(skip_all)]
281 pub async fn load_from_durable_service(&self) -> Result<(), crate::Error> {
282 const PAGE_SIZE: u32 = 1000;
283 let Some(durable) = self.config.durable.as_ref().cloned() else {
284 return Ok(());
285 };
286 let topics = durable
287 .topic_list()
288 .await
289 .map_err(crate::Error::contextual("load topic list"))?;
290 let mut task_set = tokio::task::JoinSet::new();
291 for topic in topics {
292 let node = self.clone();
293 let durable = durable.clone();
294 let task = async move {
295 let mut query = DurableMessageQuery {
296 limit: PAGE_SIZE,
297 offset: 0,
298 };
299 let mut queue = Vec::new();
300 let code = topic.code.clone();
301 loop {
302 let page = durable
303 .batch_retrieve(code.clone(), query)
304 .await
305 .map_err(crate::Error::contextual_custom("batch retrieve"))?;
306 let page_len = page.len();
307 queue.extend(page);
308 if page_len < PAGE_SIZE as usize {
309 tracing::info!(size = ?queue.len(), "message chunk loading finished");
310 break;
311 } else {
312 tracing::info!(size = ?queue.len(), "message chunk loaded");
313 query = query.next_page()
314 }
315 }
316 let result = node.load_topic(topic, queue).await;
317 if let Err(e) = result {
318 match e.kind {
319 crate::error::ErrorKind::TopicAlreadyExists
320 | crate::error::ErrorKind::NotLeader => {
321 }
323 _ => {
324 return Err(e);
325 }
326 }
327 tracing::error!(?e, "load topic error");
328 }
329 tracing::info!(topic = %code, "load topic done");
330 crate::Result::Ok(())
331 };
332 task_set.spawn(task);
333 }
334 while let Some(task) = task_set.join_next().await {
335 task.map_err(|e| crate::Error::custom("runtime join error", e))??;
336 }
337 Ok(())
338 }
339 pub(crate) async fn propose(&self, proposal: Proposal) -> Result<(), crate::Error> {
343 let raft = self.raft().await;
344 let metric = raft
345 .wait(None)
346 .metrics(
347 |rm| rm.current_leader.is_some(),
348 "wait for leader to be elected",
349 )
350 .await
351 .map_err(crate::Error::contextual_custom(
352 "wait for leader when proposal",
353 ))?;
354 let leader = metric.current_leader.expect("leader should be elected");
355 let this = self.id();
356 let is_leader = this == leader;
357 tracing::debug!(?proposal, is_leader, "trigger proposal");
358 let client_write_result = if is_leader {
359 raft.client_write(proposal)
360 .await
361 .map_err(crate::Error::contextual_custom("client write"))?
362 } else {
363 let Some(connection) = self.network.get_connection(leader).await else {
364 return Err(crate::Error::new(
365 "no connection to leader",
366 crate::error::ErrorKind::Offline,
367 ));
368 };
369 connection.propose(proposal).await?
370 };
371 let id = client_write_result.log_id();
372 raft.wait(None)
373 .applied_index_at_least(Some(id.index), "proposal resolved")
374 .await
375 .map_err(crate::Error::contextual_custom("wait for proposal"))?;
376 Ok(())
377 }
378
379 pub fn raft_opt(&self) -> Option<Raft<TypeConfig>> {
380 self.raft.get_opt()
381 }
382
383 pub fn node_ref(&self) -> NodeRef {
385 NodeRef {
386 inner: Arc::downgrade(&self.inner),
387 }
388 }
389
390 pub async fn create_edge_connection<C: EdgeNodeConnection>(
392 &self,
393 conn: C,
394 edge_config: EdgeConfig,
395 ) -> Result<NodeId, EdgeConnectionError> {
396 let config = ConnectionConfig {
397 attached_node: self.node_ref(),
398 auth: edge_config.peer_auth.into(),
399 peer_id: edge_config.peer_id,
400 };
401 let conn_inst = EdgeConnectionInstance::init(config, conn).await?;
402 let peer = conn_inst.peer_id;
403 {
404 let mut wg = self.edge_connections.write().unwrap();
405 wg.insert(peer, Arc::new(conn_inst));
406 tracing::info!(?peer, "new edge connection established");
407 }
408 Ok(peer)
409 }
410 #[inline]
411 pub fn id(&self) -> NodeId {
412 self.config.id
413 }
414 #[inline(always)]
415 pub fn is(&self, id: NodeId) -> bool {
416 self.id() == id
417 }
418
419 pub(crate) fn blocking_get_edge_routing(&self, addr: &EndpointAddr) -> Option<NodeId> {
420 loop {
422 let routing = self.edge_routing.try_read();
423 match routing {
424 Ok(routing) => return Some(routing.get(addr)?.0),
425 Err(_e) => {
426 continue;
427 }
428 }
429 }
430 }
431 pub(crate) async fn set_edge_routing(
436 &self,
437 addr: EndpointAddr,
438 edge: NodeId,
439 topic: TopicCode,
440 ) {
441 self.edge_routing.write().await.insert(addr, (edge, topic));
442 }
443 pub(crate) async fn remove_edge_routing(&self, addr: &EndpointAddr) {
445 tracing::error!(?addr, "remove_edge_routing start");
446 self.edge_routing.write().await.remove(addr);
447 tracing::error!(?addr, "remove_edge_routing finished");
448 }
449 pub fn get_edge_connection(&self, to: NodeId) -> Option<EdgeConnectionRef> {
450 let connections = self.edge_connections.read().unwrap();
451 let conn = connections.get(&to)?;
452 if conn.is_alive() {
453 Some(conn.get_connection_ref())
454 } else {
455 None
456 }
457 }
458 pub async fn remove_edge_connection(&self, to: NodeId) {
459 let Some(_connection) = self.edge_connections.write().unwrap().remove(&to) else {
460 return;
461 };
462 let mut routing = self.edge_routing.write().await;
463 let mut offline_eps = Vec::new();
464 for (addr, (node_id, topic)) in routing.iter() {
465 if node_id == &to {
466 offline_eps.push((*addr, topic.clone()));
467 }
468 }
469 for (ref addr, _) in &offline_eps {
470 routing.remove(addr);
471 }
472 drop(routing);
473 let host = self.id();
474 for (addr, topic_code) in offline_eps {
475 let offline = EndpointOffline {
476 endpoint: addr,
477 host,
478 topic_code,
479 };
480 let node = self.clone();
481 tokio::spawn(async move {
482 let _ = node.propose(Proposal::EpOffline(offline)).await;
483 });
484 }
485 }
486 pub async fn check_ep_auth(&self, ep: &EndpointAddr, peer: &NodeId) -> Result<(), EdgeError> {
487 let routing = self.edge_routing.read().await;
488 if let Some((owner, _topic)) = routing.get(ep) {
489 if owner == peer {
490 Ok(())
491 } else {
492 Err(EdgeError::new(
493 "endpoint not owned by sender",
494 EdgeErrorKind::Unauthorized,
495 ))
496 }
497 } else {
498 Err(EdgeError::new(
499 "endpoint not found",
500 EdgeErrorKind::EndpointNotFound,
501 ))
502 }
503 }
504 pub(crate) async fn handle_edge_request(
505 &self,
506 from: NodeId,
507 edge_request_kind: edge::EdgeRequestEnum,
508 ) -> Result<edge::EdgeResponseEnum, edge::EdgeError> {
509 if let Some(auth) = self.config.edge_auth.as_ref() {
511 if let Err(e) = auth.check(from, &edge_request_kind).await {
512 return Err(EdgeError::with_message(
513 "auth check failed",
514 e.to_string(),
515 EdgeErrorKind::Unauthorized,
516 ));
517 }
518 }
519 match edge_request_kind {
520 edge::EdgeRequestEnum::SendMessage(edge_message) => {
521 let (message, topic_code) = edge_message.into_message();
522
523 let handle = self
524 .send_message(topic_code, message)
525 .map_err(|e| {
526 EdgeError::with_message(
527 "send message",
528 e.to_string(),
529 EdgeErrorKind::Internal,
530 )
531 })
532 .await?;
533 let response = handle.await;
534 Ok(edge::EdgeResponseEnum::SendMessage(EdgeResult::from_std(
535 response,
536 )))
537 }
538 edge::EdgeRequestEnum::EndpointOnline(online) => {
539 let topic_code = online.topic_code.clone();
540 let Some(node) = self.node_ref().upgrade() else {
541 return Err(EdgeError::with_message(
542 "set state",
543 "node dropped",
544 EdgeErrorKind::Internal,
545 ));
546 };
547 let endpoint = EndpointAddr::new_snowflake();
548 self.set_edge_routing(endpoint, from, topic_code.clone())
549 .await;
550 tracing::info!(?online, ?endpoint, "edge endpoint online");
551 let result = node
552 .propose(Proposal::EpOnline(EndpointOnline {
553 topic_code,
554 interests: online.interests,
555 endpoint,
556 host: node.id(),
557 }))
558 .await;
559 if let Err(e) = result {
560 self.remove_edge_routing(&endpoint).await;
562 return Err(EdgeError::with_message(
563 "endpoint online",
564 e.to_string(),
565 EdgeErrorKind::Internal,
566 ));
567 }
568 Ok(edge::EdgeResponseEnum::EndpointOnline(endpoint))
569 }
570 edge::EdgeRequestEnum::EndpointOffline(offline) => {
571 let topic_code = offline.topic_code.clone();
572
573 let Some(node) = self.node_ref().upgrade() else {
574 return Err(EdgeError::with_message(
575 "set state",
576 "node dropped",
577 EdgeErrorKind::Internal,
578 ));
579 };
580 self.check_ep_auth(&offline.endpoint, &from).await?;
581 node.propose(Proposal::EpOffline(EndpointOffline {
582 topic_code: topic_code.clone(),
583 endpoint: offline.endpoint,
584 host: from,
585 }))
586 .await
587 .map_err(|e| {
588 EdgeError::with_message(
589 "endpoint offline",
590 e.to_string(),
591 EdgeErrorKind::Internal,
592 )
593 })?;
594 tracing::error!("proposed");
595 self.remove_edge_routing(&offline.endpoint).await;
596 tracing::error!("finished");
597 Ok(edge::EdgeResponseEnum::EndpointOffline)
598 }
599 edge::EdgeRequestEnum::EndpointInterest(interest) => {
600 let Some(node) = self.node_ref().upgrade() else {
601 return Err(EdgeError::with_message(
602 "set state",
603 "node dropped",
604 EdgeErrorKind::Internal,
605 ));
606 };
607 self.check_ep_auth(&interest.endpoint, &from).await?;
608 node.propose(Proposal::EpInterest(interest.clone()))
609 .await
610 .map_err(|e| {
611 EdgeError::with_message(
612 "endpoint interest",
613 e.to_string(),
614 EdgeErrorKind::Internal,
615 )
616 })?;
617 Ok(edge::EdgeResponseEnum::EndpointInterest)
618 }
619 edge::EdgeRequestEnum::SetState(set_state) => {
620 let Some(node) = self.node_ref().upgrade() else {
621 return Err(EdgeError::with_message(
622 "set state",
623 "node dropped",
624 EdgeErrorKind::Internal,
625 ));
626 };
627 for ep in set_state.update.status.keys() {
628 self.check_ep_auth(ep, &from).await?;
629 }
630 node.propose(Proposal::SetState(set_state.clone()))
631 .await
632 .map_err(|e| {
633 EdgeError::with_message("set state", e.to_string(), EdgeErrorKind::Internal)
634 })?;
635 Ok(edge::EdgeResponseEnum::SetState)
636 }
637 }
638 }
639
640 pub async fn is_leader(&self) -> bool {
641 let raft = self.raft().await;
642 raft.ensure_linearizable().await.is_ok()
643 }
644 pub async fn wait_for_leader(&self) -> Result<(), crate::Error> {
645 let raft = self.raft().await;
646 tracing::info!("wait for leader to be elected");
647 raft.wait(None)
648 .metrics(|rm| rm.current_leader.is_some(), "wait for leader")
649 .await
650 .map_err(crate::Error::contextual_custom("wait for leader"))?;
651 Ok(())
652 }
653 pub async fn load_topic<C: Into<TopicConfig>>(
654 &self,
655 config: C,
656 queue: Vec<DurableMessage>,
657 ) -> Result<(), crate::Error> {
658 let config: TopicConfig = config.into();
659 tracing::info!(?config, "load_topic");
660 self.propose(Proposal::LoadTopic(LoadTopic { config, queue }))
661 .await?;
662 Ok(())
663 }
664 pub async fn create_new_topic<C: Into<TopicConfig>>(&self, config: C) -> crate::Result<()> {
665 self.load_topic(config, Vec::new()).await
666 }
667 pub(crate) async fn remove_wait_ack(&self, id: MessageId) {
669 self.ack_waiting_pool.write().await.remove(&id);
670 }
671
672 pub async fn set_wait_ack(&self, id: MessageId) -> WaitAckHandle {
674 let (sender, handle) = WaitAckHandle::new(id);
675 self.ack_waiting_pool.write().await.insert(id, sender);
676 handle
677 }
678
679 pub async fn send_message(
681 &self,
682 topic: TopicCode,
683 message: Message,
684 ) -> Result<WaitAckHandle, crate::Error> {
685 let id = message.id();
686 let source = self.id();
687 let handle = self.set_wait_ack(id).await;
688 let proposal_result = self
689 .propose(Proposal::DelegateMessage(DelegateMessage {
690 topic,
691 message,
692 source,
693 }))
694 .await;
695 if proposal_result.is_err() {
696 self.remove_wait_ack(id).await;
697 tracing::warn!(?proposal_result, "proposal error");
698 proposal_result?;
699 }
700 Ok(handle)
701 }
702 pub async fn snapshot_data(&self) -> Result<NodeData, crate::Error> {
703 let raft = self.raft().await;
704 let mut snapshot = raft
705 .get_snapshot()
706 .await
707 .map_err(crate::Error::contextual_custom("get snapshot"))?
708 .ok_or(crate::Error::custom(
709 "get snapshot",
710 "raft instance missing snapshot",
711 ))?;
712 let data = bincode::serde::decode_from_std_read::<NodeData, _, _>(
713 &mut snapshot.snapshot,
714 BINCODE_CONFIG,
715 )
716 .map_err(crate::Error::contextual_custom("get snapshot"))?;
717 Ok(data)
718 }
719}
720
721#[derive(Debug, Clone, Serialize, Deserialize)]
722pub struct N2nRoutingInfo {
723 next_jump: NodeId,
724 hops: u32,
725}
726
727pub struct Connection {
728 pub attached_node: sync::Weak<NodeInner>,
729 pub peer_info: NodeConfig,
730}
731
732impl Drop for NodeInner {
733 fn drop(&mut self) {}
734}