Skip to main content

asteroid_mq/protocol/
node.rs

1pub mod durable_commit_service;
2pub mod durable_message;
3pub mod edge;
4pub mod raft;
5use std::{
6    collections::{BTreeMap, HashMap},
7    net::SocketAddr,
8    ops::Deref,
9    sync::{self, Arc, RwLock},
10    time::Duration,
11};
12
13use durable_message::DurableMessageQuery;
14
15pub use asteroid_mq_model::NodeId;
16use asteroid_mq_model::{
17    codec::BINCODE_CONFIG, connection::EdgeNodeConnection, EndpointAddr, Message, MessageId,
18    TopicCode,
19};
20use durable_commit_service::DurableCommitService;
21use edge::{
22    auth::EdgeAuthService,
23    connection::{
24        ConnectionConfig, EdgeConnectionError, EdgeConnectionInstance, EdgeConnectionRef,
25    },
26    middleware::EdgeConnectionHandlerObject,
27    EdgeConfig, EdgeResult,
28};
29use edge::{packet::Auth, EdgeError, EdgeErrorKind};
30use futures_util::TryFutureExt;
31use openraft::Raft;
32use raft::{
33    cluster::{ClusterProvider, ClusterService},
34    log_storage::LogStorage,
35    network_factory::TcpNetworkService,
36    proposal::{DelegateMessage, EndpointOffline, EndpointOnline, LoadTopic, Proposal},
37    raft_node::TcpNode,
38    state_machine::{
39        node::NodeData,
40        topic::{
41            config::TopicConfig,
42            wait_ack::{AckSender, WaitAckHandle},
43        },
44        StateMachineStore,
45    },
46    MaybeLoadingRaft, TypeConfig,
47};
48use serde::{Deserialize, Serialize};
49use tokio_util::sync::CancellationToken;
50
51use crate::{
52    prelude::{DurableMessage, DurableService},
53    DEFAULT_TCP_SOCKET_ADDR,
54};
55
56#[derive(Debug, Clone)]
57pub struct NodeConfig {
58    pub id: NodeId,
59    pub addr: SocketAddr,
60    pub raft: openraft::Config,
61    pub durable: Option<DurableService>,
62    pub edge_auth: Option<EdgeAuthService>,
63}
64
65impl Default for NodeConfig {
66    fn default() -> Self {
67        Self {
68            id: NodeId::default(),
69            addr: DEFAULT_TCP_SOCKET_ADDR,
70            raft: openraft::Config::default(),
71            durable: None,
72            edge_auth: None,
73        }
74    }
75}
76
77#[derive(Debug)]
78pub struct NodeInner {
79    raft: MaybeLoadingRaft,
80    network: TcpNetworkService,
81    config: NodeConfig,
82    edge_connections: RwLock<HashMap<NodeId, Arc<EdgeConnectionInstance>>>,
83    edge_routing: tokio::sync::RwLock<HashMap<EndpointAddr, (NodeId, TopicCode)>>,
84    pub edge_handler: tokio::sync::RwLock<EdgeConnectionHandlerObject>,
85
86    // May we delete this and access the node api from edge sdk with tokio channel socket connection?
87    // #[deprecated]
88    // topics: RwLock<HashMap<TopicCode, Topic>>,
89    /// ack responser pool
90    pub(crate) ack_waiting_pool: Arc<tokio::sync::RwLock<HashMap<MessageId, AckSender>>>,
91
92    /// ensure operations on the same topic are linearized
93    pub(crate) durable_commit_service: Option<DurableCommitService>,
94    /// time scheduler
95    pub(crate) scheduler: tsuki_scheduler::AsyncSchedulerClient<tsuki_scheduler::runtime::Tokio>,
96    ct: CancellationToken,
97}
98
99impl NodeInner {
100    pub async fn shutdown(&self) {
101        if let Some(raft) = self.raft.get_opt() {
102            let result = raft.shutdown().await;
103            if let Err(e) = result {
104                tracing::error!(?e, "raft shutdown error");
105            }
106        }
107        self.ct.cancel();
108    }
109}
110#[derive(Debug, Clone, Default)]
111pub struct NodeRef {
112    inner: std::sync::Weak<NodeInner>,
113}
114
115impl NodeRef {
116    pub(crate) fn upgrade(&self) -> Option<Node> {
117        self.inner.upgrade().map(|inner| Node { inner })
118    }
119}
120
121#[derive(Clone)]
122pub struct Node {
123    pub(crate) inner: Arc<NodeInner>,
124}
125
126impl std::fmt::Debug for Node {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("Node")
129            .field("id", &self.id())
130            .field("config", &self.config())
131            .finish()
132    }
133}
134
135impl Deref for Node {
136    type Target = NodeInner;
137    fn deref(&self) -> &Self::Target {
138        &self.inner
139    }
140}
141
142impl Node {
143    /// Get the inner openraft structure
144    pub async fn raft(&self) -> Raft<TypeConfig> {
145        self.raft.get().await
146    }
147    /// Get node config
148    pub fn config(&self) -> &NodeConfig {
149        &self.config
150    }
151    /// Create a new node
152    pub fn new(config: NodeConfig) -> Self {
153        let ct = CancellationToken::new();
154        let raft = MaybeLoadingRaft::new();
155        let network = raft.net_work_service(
156            config.id,
157            TcpNode::new(config.addr.to_string()),
158            ct.child_token(),
159        );
160        let scheduler_runner = tsuki_scheduler::AsyncSchedulerRunner::tokio()
161            .with_execute_duration(Duration::from_secs(1));
162        let scheduler_client = scheduler_runner.client();
163        let scheduler_running =
164            scheduler_runner.run_with_shutdown_signal(Box::pin(ct.child_token().cancelled_owned()));
165        tokio::spawn(scheduler_running);
166        let inner = NodeInner {
167            edge_connections: RwLock::new(HashMap::new()),
168            edge_routing: tokio::sync::RwLock::new(HashMap::new()),
169            // topics: RwLock::new(HashMap::new()),
170            edge_handler: tokio::sync::RwLock::new(EdgeConnectionHandlerObject::basic()),
171            durable_commit_service: config
172                .durable
173                .as_ref()
174                .map(|ds| DurableCommitService::new(raft.clone(), ds.clone(), ct.child_token())),
175            config,
176            raft,
177            network,
178            scheduler: scheduler_client,
179            ack_waiting_pool: Default::default(),
180            ct,
181        };
182        Self {
183            inner: Arc::new(inner),
184        }
185    }
186    /// Start running, this will start tcp connection service and node discovery service
187    pub async fn start<C: ClusterProvider>(
188        &self,
189        mut cluster_provider: C,
190    ) -> Result<(), crate::Error> {
191        if self.raft_opt().is_some() {
192            return Ok(());
193        }
194        let cluster_service_ct = self.ct.child_token();
195        let node_ref = self.node_ref();
196        let id = self.id();
197        let maybe_loading_raft = self.raft.clone();
198        let tcp_service = self.network.clone();
199        let state_machine_store = StateMachineStore::new(node_ref);
200        let raft_config = self
201            .config
202            .raft
203            .clone()
204            .validate()
205            .map_err(crate::Error::contextual_custom("validate raft config"))?;
206        tcp_service.run_service();
207        // start up durable commit service
208        if let Some(durable_commit_service) = &self.durable_commit_service {
209            durable_commit_service.clone().spawn()
210        }
211        let raft = Raft::<TypeConfig>::new(
212            id,
213            Arc::new(raft_config.clone()),
214            tcp_service.clone(),
215            LogStorage::default(),
216            Arc::new(state_machine_store),
217        )
218        .await
219        .map_err(crate::Error::contextual_custom("create raft node"))?;
220        // waiting for members contain self
221        let pristine_nodes = cluster_provider.pristine_nodes().await?;
222        tracing::info!(?pristine_nodes);
223
224        if pristine_nodes.contains_key(&id) {
225            loop {
226                let mut peer_nodes = pristine_nodes.clone();
227                peer_nodes.remove(&id);
228                let mut ensured_nodes = BTreeMap::new();
229                for (peer_id, peer_addr) in peer_nodes.iter() {
230                    if ensured_nodes.contains_key(peer_id) {
231                        continue;
232                    }
233                    let ensure_result = tcp_service
234                        .ensure_connection(*peer_id, peer_addr.clone())
235                        .await;
236                    if let Err(e) = ensure_result {
237                        tracing::warn!("failed to ensure connection to {}: {}", peer_id, e);
238                    } else {
239                        ensured_nodes.insert(*peer_id, peer_addr.clone());
240                        tracing::trace!("connection to {} ensured", peer_id);
241                    }
242                }
243                if ensured_nodes.len() == peer_nodes.len() {
244                    break;
245                } else {
246                    tokio::time::sleep(Duration::from_millis(raft_config.heartbeat_interval)).await;
247                }
248            }
249            maybe_loading_raft.set(raft.clone());
250            raft.initialize(
251                pristine_nodes
252                    .into_iter()
253                    .map(|(k, n)| (k, TcpNode::new(n)))
254                    .collect::<BTreeMap<_, _>>(),
255            )
256            .await
257            .map_err(crate::Error::contextual_custom("init raft node"))?;
258
259            // wait for election
260            raft.wait(None)
261                .metrics(
262                    |rm| rm.current_leader.is_some(),
263                    "wait for leader to be elected",
264                )
265                .await
266                .map_err(crate::Error::contextual_custom("wait for leader"))?;
267        } else {
268            return Err(crate::Error::unknown(format!(
269                "{id} not in cluster: {pristine_nodes:?}"
270            )));
271        }
272        let cluster_service =
273            ClusterService::new(cluster_provider, tcp_service, cluster_service_ct);
274        cluster_service.spawn();
275        tracing::info!("node started");
276        Ok(())
277    }
278
279    /// load existed topic from durable service
280    #[tracing::instrument(skip_all)]
281    pub async fn load_from_durable_service(&self) -> Result<(), crate::Error> {
282        const PAGE_SIZE: u32 = 1000;
283        let Some(durable) = self.config.durable.as_ref().cloned() else {
284            return Ok(());
285        };
286        let topics = durable
287            .topic_list()
288            .await
289            .map_err(crate::Error::contextual("load topic list"))?;
290        let mut task_set = tokio::task::JoinSet::new();
291        for topic in topics {
292            let node = self.clone();
293            let durable = durable.clone();
294            let task = async move {
295                let mut query = DurableMessageQuery {
296                    limit: PAGE_SIZE,
297                    offset: 0,
298                };
299                let mut queue = Vec::new();
300                let code = topic.code.clone();
301                loop {
302                    let page = durable
303                        .batch_retrieve(code.clone(), query)
304                        .await
305                        .map_err(crate::Error::contextual_custom("batch retrieve"))?;
306                    let page_len = page.len();
307                    queue.extend(page);
308                    if page_len < PAGE_SIZE as usize {
309                        tracing::info!(size = ?queue.len(), "message chunk loading finished");
310                        break;
311                    } else {
312                        tracing::info!(size = ?queue.len(), "message chunk loaded");
313                        query = query.next_page()
314                    }
315                }
316                let result = node.load_topic(topic, queue).await;
317                if let Err(e) = result {
318                    match e.kind {
319                        crate::error::ErrorKind::TopicAlreadyExists
320                        | crate::error::ErrorKind::NotLeader => {
321                            // do nothing
322                        }
323                        _ => {
324                            return Err(e);
325                        }
326                    }
327                    tracing::error!(?e, "load topic error");
328                }
329                tracing::info!(topic = %code, "load topic done");
330                crate::Result::Ok(())
331            };
332            task_set.spawn(task);
333        }
334        while let Some(task) = task_set.join_next().await {
335            task.map_err(|e| crate::Error::custom("runtime join error", e))??;
336        }
337        Ok(())
338    }
339    /// Propose a raft proposal
340    ///
341    /// The final behavior is determined by the role of raft node.
342    pub(crate) async fn propose(&self, proposal: Proposal) -> Result<(), crate::Error> {
343        let raft = self.raft().await;
344        let metric = raft
345            .wait(None)
346            .metrics(
347                |rm| rm.current_leader.is_some(),
348                "wait for leader to be elected",
349            )
350            .await
351            .map_err(crate::Error::contextual_custom(
352                "wait for leader when proposal",
353            ))?;
354        let leader = metric.current_leader.expect("leader should be elected");
355        let this = self.id();
356        let is_leader = this == leader;
357        tracing::debug!(?proposal, is_leader, "trigger proposal");
358        let client_write_result = if is_leader {
359            raft.client_write(proposal)
360                .await
361                .map_err(crate::Error::contextual_custom("client write"))?
362        } else {
363            let Some(connection) = self.network.get_connection(leader).await else {
364                return Err(crate::Error::new(
365                    "no connection to leader",
366                    crate::error::ErrorKind::Offline,
367                ));
368            };
369            connection.propose(proposal).await?
370        };
371        let id = client_write_result.log_id();
372        raft.wait(None)
373            .applied_index_at_least(Some(id.index), "proposal resolved")
374            .await
375            .map_err(crate::Error::contextual_custom("wait for proposal"))?;
376        Ok(())
377    }
378
379    pub fn raft_opt(&self) -> Option<Raft<TypeConfig>> {
380        self.raft.get_opt()
381    }
382
383    /// Get a weak reference of this node
384    pub fn node_ref(&self) -> NodeRef {
385        NodeRef {
386            inner: Arc::downgrade(&self.inner),
387        }
388    }
389
390    /// Create a connection to a edge node.
391    pub async fn create_edge_connection<C: EdgeNodeConnection>(
392        &self,
393        conn: C,
394        edge_config: EdgeConfig,
395    ) -> Result<NodeId, EdgeConnectionError> {
396        let config = ConnectionConfig {
397            attached_node: self.node_ref(),
398            auth: edge_config.peer_auth.into(),
399            peer_id: edge_config.peer_id,
400        };
401        let conn_inst = EdgeConnectionInstance::init(config, conn).await?;
402        let peer = conn_inst.peer_id;
403        {
404            let mut wg = self.edge_connections.write().unwrap();
405            wg.insert(peer, Arc::new(conn_inst));
406            tracing::info!(?peer, "new edge connection established");
407        }
408        Ok(peer)
409    }
410    #[inline]
411    pub fn id(&self) -> NodeId {
412        self.config.id
413    }
414    #[inline(always)]
415    pub fn is(&self, id: NodeId) -> bool {
416        self.id() == id
417    }
418
419    pub(crate) fn blocking_get_edge_routing(&self, addr: &EndpointAddr) -> Option<NodeId> {
420        // we can do this for we never hold write guard across an await point
421        loop {
422            let routing = self.edge_routing.try_read();
423            match routing {
424                Ok(routing) => return Some(routing.get(addr)?.0),
425                Err(_e) => {
426                    continue;
427                }
428            }
429        }
430    }
431    // pub(crate) async fn get_edge_routing(&self, addr: &EndpointAddr) -> Option<NodeId> {
432    //     let routing = self.edge_routing.read().await;
433    //     Some(routing.get(addr)?.0)
434    // }
435    pub(crate) async fn set_edge_routing(
436        &self,
437        addr: EndpointAddr,
438        edge: NodeId,
439        topic: TopicCode,
440    ) {
441        self.edge_routing.write().await.insert(addr, (edge, topic));
442    }
443    /// Read in sync, write in async
444    pub(crate) async fn remove_edge_routing(&self, addr: &EndpointAddr) {
445        tracing::error!(?addr, "remove_edge_routing start");
446        self.edge_routing.write().await.remove(addr);
447        tracing::error!(?addr, "remove_edge_routing finished");
448    }
449    pub fn get_edge_connection(&self, to: NodeId) -> Option<EdgeConnectionRef> {
450        let connections = self.edge_connections.read().unwrap();
451        let conn = connections.get(&to)?;
452        if conn.is_alive() {
453            Some(conn.get_connection_ref())
454        } else {
455            None
456        }
457    }
458    pub async fn remove_edge_connection(&self, to: NodeId) {
459        let Some(_connection) = self.edge_connections.write().unwrap().remove(&to) else {
460            return;
461        };
462        let mut routing = self.edge_routing.write().await;
463        let mut offline_eps = Vec::new();
464        for (addr, (node_id, topic)) in routing.iter() {
465            if node_id == &to {
466                offline_eps.push((*addr, topic.clone()));
467            }
468        }
469        for (ref addr, _) in &offline_eps {
470            routing.remove(addr);
471        }
472        drop(routing);
473        let host = self.id();
474        for (addr, topic_code) in offline_eps {
475            let offline = EndpointOffline {
476                endpoint: addr,
477                host,
478                topic_code,
479            };
480            let node = self.clone();
481            tokio::spawn(async move {
482                let _ = node.propose(Proposal::EpOffline(offline)).await;
483            });
484        }
485    }
486    pub async fn check_ep_auth(&self, ep: &EndpointAddr, peer: &NodeId) -> Result<(), EdgeError> {
487        let routing = self.edge_routing.read().await;
488        if let Some((owner, _topic)) = routing.get(ep) {
489            if owner == peer {
490                Ok(())
491            } else {
492                Err(EdgeError::new(
493                    "endpoint not owned by sender",
494                    EdgeErrorKind::Unauthorized,
495                ))
496            }
497        } else {
498            Err(EdgeError::new(
499                "endpoint not found",
500                EdgeErrorKind::EndpointNotFound,
501            ))
502        }
503    }
504    pub(crate) async fn handle_edge_request(
505        &self,
506        from: NodeId,
507        edge_request_kind: edge::EdgeRequestEnum,
508    ) -> Result<edge::EdgeResponseEnum, edge::EdgeError> {
509        // auth check
510        if let Some(auth) = self.config.edge_auth.as_ref() {
511            if let Err(e) = auth.check(from, &edge_request_kind).await {
512                return Err(EdgeError::with_message(
513                    "auth check failed",
514                    e.to_string(),
515                    EdgeErrorKind::Unauthorized,
516                ));
517            }
518        }
519        match edge_request_kind {
520            edge::EdgeRequestEnum::SendMessage(edge_message) => {
521                let (message, topic_code) = edge_message.into_message();
522
523                let handle = self
524                    .send_message(topic_code, message)
525                    .map_err(|e| {
526                        EdgeError::with_message(
527                            "send message",
528                            e.to_string(),
529                            EdgeErrorKind::Internal,
530                        )
531                    })
532                    .await?;
533                let response = handle.await;
534                Ok(edge::EdgeResponseEnum::SendMessage(EdgeResult::from_std(
535                    response,
536                )))
537            }
538            edge::EdgeRequestEnum::EndpointOnline(online) => {
539                let topic_code = online.topic_code.clone();
540                let Some(node) = self.node_ref().upgrade() else {
541                    return Err(EdgeError::with_message(
542                        "set state",
543                        "node dropped",
544                        EdgeErrorKind::Internal,
545                    ));
546                };
547                let endpoint = EndpointAddr::new_snowflake();
548                self.set_edge_routing(endpoint, from, topic_code.clone())
549                    .await;
550                tracing::info!(?online, ?endpoint, "edge endpoint online");
551                let result = node
552                    .propose(Proposal::EpOnline(EndpointOnline {
553                        topic_code,
554                        interests: online.interests,
555                        endpoint,
556                        host: node.id(),
557                    }))
558                    .await;
559                if let Err(e) = result {
560                    // rollback edge routing change
561                    self.remove_edge_routing(&endpoint).await;
562                    return Err(EdgeError::with_message(
563                        "endpoint online",
564                        e.to_string(),
565                        EdgeErrorKind::Internal,
566                    ));
567                }
568                Ok(edge::EdgeResponseEnum::EndpointOnline(endpoint))
569            }
570            edge::EdgeRequestEnum::EndpointOffline(offline) => {
571                let topic_code = offline.topic_code.clone();
572
573                let Some(node) = self.node_ref().upgrade() else {
574                    return Err(EdgeError::with_message(
575                        "set state",
576                        "node dropped",
577                        EdgeErrorKind::Internal,
578                    ));
579                };
580                self.check_ep_auth(&offline.endpoint, &from).await?;
581                node.propose(Proposal::EpOffline(EndpointOffline {
582                    topic_code: topic_code.clone(),
583                    endpoint: offline.endpoint,
584                    host: from,
585                }))
586                .await
587                .map_err(|e| {
588                    EdgeError::with_message(
589                        "endpoint offline",
590                        e.to_string(),
591                        EdgeErrorKind::Internal,
592                    )
593                })?;
594                tracing::error!("proposed");
595                self.remove_edge_routing(&offline.endpoint).await;
596                tracing::error!("finished");
597                Ok(edge::EdgeResponseEnum::EndpointOffline)
598            }
599            edge::EdgeRequestEnum::EndpointInterest(interest) => {
600                let Some(node) = self.node_ref().upgrade() else {
601                    return Err(EdgeError::with_message(
602                        "set state",
603                        "node dropped",
604                        EdgeErrorKind::Internal,
605                    ));
606                };
607                self.check_ep_auth(&interest.endpoint, &from).await?;
608                node.propose(Proposal::EpInterest(interest.clone()))
609                    .await
610                    .map_err(|e| {
611                        EdgeError::with_message(
612                            "endpoint interest",
613                            e.to_string(),
614                            EdgeErrorKind::Internal,
615                        )
616                    })?;
617                Ok(edge::EdgeResponseEnum::EndpointInterest)
618            }
619            edge::EdgeRequestEnum::SetState(set_state) => {
620                let Some(node) = self.node_ref().upgrade() else {
621                    return Err(EdgeError::with_message(
622                        "set state",
623                        "node dropped",
624                        EdgeErrorKind::Internal,
625                    ));
626                };
627                for ep in set_state.update.status.keys() {
628                    self.check_ep_auth(ep, &from).await?;
629                }
630                node.propose(Proposal::SetState(set_state.clone()))
631                    .await
632                    .map_err(|e| {
633                        EdgeError::with_message("set state", e.to_string(), EdgeErrorKind::Internal)
634                    })?;
635                Ok(edge::EdgeResponseEnum::SetState)
636            }
637        }
638    }
639
640    pub async fn is_leader(&self) -> bool {
641        let raft = self.raft().await;
642        raft.ensure_linearizable().await.is_ok()
643    }
644    pub async fn wait_for_leader(&self) -> Result<(), crate::Error> {
645        let raft = self.raft().await;
646        tracing::info!("wait for leader to be elected");
647        raft.wait(None)
648            .metrics(|rm| rm.current_leader.is_some(), "wait for leader")
649            .await
650            .map_err(crate::Error::contextual_custom("wait for leader"))?;
651        Ok(())
652    }
653    pub async fn load_topic<C: Into<TopicConfig>>(
654        &self,
655        config: C,
656        queue: Vec<DurableMessage>,
657    ) -> Result<(), crate::Error> {
658        let config: TopicConfig = config.into();
659        tracing::info!(?config, "load_topic");
660        self.propose(Proposal::LoadTopic(LoadTopic { config, queue }))
661            .await?;
662        Ok(())
663    }
664    pub async fn create_new_topic<C: Into<TopicConfig>>(&self, config: C) -> crate::Result<()> {
665        self.load_topic(config, Vec::new()).await
666    }
667    /// Create the wait handle of a specific message
668    pub(crate) async fn remove_wait_ack(&self, id: MessageId) {
669        self.ack_waiting_pool.write().await.remove(&id);
670    }
671
672    /// Create the wait handle of a specific message
673    pub async fn set_wait_ack(&self, id: MessageId) -> WaitAckHandle {
674        let (sender, handle) = WaitAckHandle::new(id);
675        self.ack_waiting_pool.write().await.insert(id, sender);
676        handle
677    }
678
679    /// Send a message out, and get a awaitable handle.
680    pub async fn send_message(
681        &self,
682        topic: TopicCode,
683        message: Message,
684    ) -> Result<WaitAckHandle, crate::Error> {
685        let id = message.id();
686        let source = self.id();
687        let handle = self.set_wait_ack(id).await;
688        let proposal_result = self
689            .propose(Proposal::DelegateMessage(DelegateMessage {
690                topic,
691                message,
692                source,
693            }))
694            .await;
695        if proposal_result.is_err() {
696            self.remove_wait_ack(id).await;
697            tracing::warn!(?proposal_result, "proposal error");
698            proposal_result?;
699        }
700        Ok(handle)
701    }
702    pub async fn snapshot_data(&self) -> Result<NodeData, crate::Error> {
703        let raft = self.raft().await;
704        let mut snapshot = raft
705            .get_snapshot()
706            .await
707            .map_err(crate::Error::contextual_custom("get snapshot"))?
708            .ok_or(crate::Error::custom(
709                "get snapshot",
710                "raft instance missing snapshot",
711            ))?;
712        let data = bincode::serde::decode_from_std_read::<NodeData, _, _>(
713            &mut snapshot.snapshot,
714            BINCODE_CONFIG,
715        )
716        .map_err(crate::Error::contextual_custom("get snapshot"))?;
717        Ok(data)
718    }
719}
720
721#[derive(Debug, Clone, Serialize, Deserialize)]
722pub struct N2nRoutingInfo {
723    next_jump: NodeId,
724    hops: u32,
725}
726
727pub struct Connection {
728    pub attached_node: sync::Weak<NodeInner>,
729    pub peer_info: NodeConfig,
730}
731
732impl Drop for NodeInner {
733    fn drop(&mut self) {}
734}