1use std::marker::PhantomData;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::core::marker::{Edge, NodeKind, Proxy};
8use crate::core::node::{NodeBuilder, NodeConf};
9use crate::core::utils::Guarded;
10use crate::error::{NodeError, RecvResult, RecvTimeoutResult, TryRecvResult};
11use crate::protocol::{Peer, Unset};
12use crate::sync::marker::ConnConf;
13
14use crate::prelude::*;
15use crate::sync::prelude::*;
16
17impl Node<Proxy, Versionless, Unset> {
18 pub fn sync<V: MaybeVersioned>() -> NodeBuilder<Unset, Unset, V, Unset, SyncApi<V>> {
25 NodeBuilder::synchronous().version::<V>()
26 }
27}
28
29impl<K: NodeKind, V: MaybeVersioned> Node<K, V, SyncApi<V>> {
30 pub fn try_from_conf(conf: NodeConf<K, V, ConnConf<V>>) -> Result<Self> {
35 let (conn, conn_handler) = conf.connection().build()?;
36
37 let processor = Arc::new(conf.make_processor());
38 let api = SyncApi::new(conn, processor.clone());
39
40 let state = api.share_state();
41 let is_active = Guarded::from(&state);
42
43 let node = Self {
44 kind: conf.kind,
45 api,
46 state,
47 is_active,
48 heartbeat_timeout: conf.heartbeat_timeout,
49 heartbeat_interval: conf.heartbeat_interval,
50 processor,
51 _version: PhantomData,
52 };
53
54 node.api.start_default_handlers(node.heartbeat_timeout);
55 node.api.handle_conn_stop(conn_handler);
56
57 Ok(node)
58 }
59
60 pub fn has_peers(&self) -> bool {
65 self.api.has_peers()
66 }
67
68 pub fn peers(&self) -> impl Iterator<Item = Peer> {
75 self.api.peers()
76 }
77
78 #[inline(always)]
88 pub fn receiver(&self) -> &EventReceiver<V> {
89 self.api.event_receiver()
90 }
91
92 #[inline(always)]
93 pub(in crate::sync) fn frame_sender(&self) -> &FrameSender<V, Proxy> {
94 self.api.frame_sender()
95 }
96}
97
98impl<V: MaybeVersioned> Node<Proxy, V, SyncApi<V>> {
99 pub fn sender(&self) -> FrameSender<V, Proxy> {
108 self.api.frame_sender().clone()
109 }
110}
111
112impl<V: MaybeVersioned> Node<Edge<V>, V, SyncApi<V>> {
113 pub fn sender(&self) -> FrameSender<V, Edge<V>> {
119 self.api.frame_sender().clone().into_edge(self.kind.clone())
120 }
121}
122
123impl<V: Versioned> Node<Edge<V>, V, SyncApi<V>> {
124 pub fn activate(&mut self) -> Result<()> {
135 if self.state.is_closed() {
136 return Err(Error::Node(NodeError::Inactive));
137 }
138
139 if self.is_active.is() {
140 return Ok(());
141 }
142
143 self.is_active.set(true);
144
145 self.api.start_sending_heartbeats(
146 self.kind.endpoint.clone(),
147 self.heartbeat_interval,
148 self.is_active.clone(),
149 self.dialect().version(),
150 );
151
152 Ok(())
153 }
154}
155
156impl<K: NodeKind, V: MaybeVersioned> ReceiveEvent<V> for Node<K, V, SyncApi<V>> {
157 #[inline(always)]
158 fn recv(&self) -> RecvResult<Event<V>> {
159 self.receiver().recv()
160 }
161
162 #[inline(always)]
163 fn recv_timeout(&self, timeout: Duration) -> RecvTimeoutResult<Event<V>> {
164 self.receiver().recv_timeout(timeout)
165 }
166
167 #[inline(always)]
168 fn try_recv(&self) -> TryRecvResult<Event<V>> {
169 self.receiver().try_recv()
170 }
171
172 #[inline(always)]
173 fn events(&self) -> impl Iterator<Item = Event<V>> {
174 self.receiver().events()
175 }
176}
177
178impl<K: NodeKind, V: MaybeVersioned> ReceiveFrame<V> for Node<K, V, SyncApi<V>> {}