Skip to main content

net_sdk/
net.rs

1//! The main SDK handle.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use serde::Serialize;
7
8use net::event::RawEvent;
9use net::{ConsumeRequest, Event, EventBus, StoredEvent};
10
11use crate::config::NetBuilder;
12use crate::error::Result;
13use crate::stream::{EventStream, SubscribeOpts, TypedEventStream};
14
15/// Receipt from a successful ingestion.
16#[derive(Debug, Clone, Copy)]
17pub struct Receipt {
18    /// The shard the event was assigned to.
19    pub shard_id: u16,
20    /// Insertion timestamp (nanoseconds).
21    pub timestamp: u64,
22}
23
24/// Ingestion statistics.
25#[derive(Debug, Clone, Copy)]
26pub struct Stats {
27    /// Total events ingested.
28    pub events_ingested: u64,
29    /// Events dropped due to backpressure.
30    pub events_dropped: u64,
31    /// Batches dispatched to the adapter.
32    pub batches_dispatched: u64,
33}
34
35/// Options for a one-shot poll request.
36#[derive(Debug, Clone, Default)]
37pub struct PollRequest {
38    /// Maximum number of events to return.
39    pub limit: usize,
40    /// Cursor to resume from.
41    pub cursor: Option<String>,
42    /// Optional event filter.
43    pub filter: Option<net::Filter>,
44    /// Event ordering.
45    pub ordering: Option<net::consumer::Ordering>,
46    /// Specific shards to poll.
47    pub shards: Option<Vec<u16>>,
48}
49
50/// Response from a poll request.
51#[derive(Debug, Clone)]
52pub struct PollResponse {
53    /// Events returned.
54    pub events: Vec<StoredEvent>,
55    /// Cursor for the next poll.
56    pub next_id: Option<String>,
57    /// Whether more events are available.
58    pub has_more: bool,
59}
60
61/// A node on the Net mesh.
62///
63/// This is the main SDK handle. Every computer, device, and application
64/// is a `Net` node — there are no clients, no servers, no coordinators.
65///
66/// # Example
67///
68/// ```rust,no_run
69/// use net_sdk::{Net, Backpressure};
70///
71/// # async fn example() -> net_sdk::error::Result<()> {
72/// let node = Net::builder()
73///     .shards(4)
74///     .backpressure(Backpressure::DropOldest)
75///     .memory()
76///     .build()
77///     .await?;
78///
79/// node.emit(&serde_json::json!({"token": "hello"}))?;
80///
81/// let response = node.poll(net_sdk::PollRequest { limit: 100, ..Default::default() }).await?;
82/// for event in &response.events {
83///     println!("{}", event.raw_str().unwrap_or("<non-utf8>"));
84/// }
85///
86/// node.shutdown().await?;
87/// # Ok(())
88/// # }
89/// ```
90pub struct Net {
91    bus: Arc<EventBus>,
92}
93
94impl Net {
95    /// Create a builder for configuring a new node.
96    pub fn builder() -> NetBuilder {
97        NetBuilder::new()
98    }
99
100    /// Create a node from an existing `EventBus`.
101    pub fn from_bus(bus: EventBus) -> Self {
102        Self { bus: Arc::new(bus) }
103    }
104
105    /// Build and start a new node from a builder.
106    pub(crate) async fn from_builder(builder: NetBuilder) -> Result<Self> {
107        let config = builder.build_config()?;
108        let bus = EventBus::new(config).await?;
109        Ok(Self { bus: Arc::new(bus) })
110    }
111
112    // ---- Ingestion ----
113
114    /// Emit a serializable event.
115    ///
116    /// Serializes `T` to JSON via serde and ingests it.
117    pub fn emit<T: Serialize>(&self, event: &T) -> Result<Receipt> {
118        let value = serde_json::to_value(event)?;
119        let e = Event::new(value);
120        let (shard_id, timestamp) = self.bus.ingest(e)?;
121        Ok(Receipt {
122            shard_id,
123            timestamp,
124        })
125    }
126
127    /// Emit raw bytes (fastest path).
128    pub fn emit_raw(&self, bytes: impl Into<Bytes>) -> Result<Receipt> {
129        let raw = RawEvent::from_bytes(bytes);
130        let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
131        Ok(Receipt {
132            shard_id,
133            timestamp,
134        })
135    }
136
137    /// Emit a raw JSON string.
138    pub fn emit_str(&self, json: &str) -> Result<Receipt> {
139        let raw = RawEvent::from_str(json);
140        let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
141        Ok(Receipt {
142            shard_id,
143            timestamp,
144        })
145    }
146
147    /// Emit a batch of serializable events. Returns the number ingested.
148    pub fn emit_batch<T: Serialize>(&self, events: &[T]) -> Result<usize> {
149        let mut raw_events = Vec::with_capacity(events.len());
150        for event in events {
151            let value = serde_json::to_value(event)?;
152            raw_events.push(RawEvent::from_value(value));
153        }
154        Ok(self.bus.ingest_raw_batch(raw_events))
155    }
156
157    /// Emit a batch of raw byte events. Returns the number ingested.
158    pub fn emit_raw_batch(&self, events: Vec<Bytes>) -> usize {
159        let raw_events: Vec<RawEvent> = events.into_iter().map(RawEvent::from_bytes).collect();
160        self.bus.ingest_raw_batch(raw_events)
161    }
162
163    // ---- Consumption ----
164
165    /// One-shot poll for events.
166    pub async fn poll(&self, request: PollRequest) -> Result<PollResponse> {
167        let mut req = ConsumeRequest::new(request.limit);
168        if let Some(cursor) = request.cursor {
169            req = req.from(cursor);
170        }
171        if let Some(filter) = request.filter {
172            req = req.filter(filter);
173        }
174        if let Some(ordering) = request.ordering {
175            req = req.ordering(ordering);
176        }
177        if let Some(shards) = request.shards {
178            req = req.shards(shards);
179        }
180        let response = self.bus.poll(req).await?;
181        Ok(PollResponse {
182            events: response.events,
183            next_id: response.next_id,
184            has_more: response.has_more,
185        })
186    }
187
188    /// Subscribe to a stream of events.
189    ///
190    /// Returns an async `Stream` that yields events with adaptive backoff.
191    pub fn subscribe(&self, opts: SubscribeOpts) -> EventStream {
192        EventStream::new(self.bus.clone(), opts)
193    }
194
195    /// Subscribe to a typed stream of events.
196    ///
197    /// Each event is automatically deserialized into `T`.
198    pub fn subscribe_typed<T: serde::de::DeserializeOwned>(
199        &self,
200        opts: SubscribeOpts,
201    ) -> TypedEventStream<T> {
202        TypedEventStream::new(self.bus.clone(), opts)
203    }
204
205    // ---- Lifecycle ----
206
207    /// Get ingestion statistics.
208    pub fn stats(&self) -> Stats {
209        let s = self.bus.stats();
210        Stats {
211            events_ingested: s.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
212            events_dropped: s.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
213            batches_dispatched: s
214                .batches_dispatched
215                .load(std::sync::atomic::Ordering::Relaxed),
216        }
217    }
218
219    /// Get the number of active shards.
220    pub fn shards(&self) -> u16 {
221        self.bus.num_shards()
222    }
223
224    /// Check if the node is healthy.
225    pub async fn health(&self) -> bool {
226        self.bus.is_healthy().await
227    }
228
229    /// Flush all pending batches to the adapter.
230    pub async fn flush(&self) -> Result<()> {
231        self.bus.flush().await?;
232        Ok(())
233    }
234
235    /// Gracefully shut down the node.
236    ///
237    /// Drains pending events through the adapter and signals all
238    /// background tasks to exit. Safe to call even when other
239    /// `Arc<EventBus>` clones exist (e.g. an outstanding
240    /// `EventStream` from `subscribe`): `EventBus::shutdown_via_ref`
241    /// is idempotent and reference-based, so the shutdown work runs
242    /// regardless of strong-ref count, and outstanding clones
243    /// observe the bus as shut down on their next operation.
244    pub async fn shutdown(self) -> Result<()> {
245        self.bus.shutdown_via_ref().await?;
246        Ok(())
247    }
248
249    /// Get a reference to the underlying `EventBus`.
250    pub fn bus(&self) -> &EventBus {
251        &self.bus
252    }
253}