1use std::sync::Arc;
4
5use bytes::Bytes;
6use serde::Serialize;
7
8use net::event::RawEvent;
9use net::{ConsumeRequest, Event, EventBus, StoredEvent};
10
11use crate::config::NetBuilder;
12use crate::error::Result;
13use crate::stream::{EventStream, SubscribeOpts, TypedEventStream};
14
15#[derive(Debug, Clone, Copy)]
17pub struct Receipt {
18 pub shard_id: u16,
20 pub timestamp: u64,
22}
23
24#[derive(Debug, Clone, Copy)]
26pub struct Stats {
27 pub events_ingested: u64,
29 pub events_dropped: u64,
31 pub batches_dispatched: u64,
33}
34
35#[derive(Debug, Clone, Default)]
37pub struct PollRequest {
38 pub limit: usize,
40 pub cursor: Option<String>,
42 pub filter: Option<net::Filter>,
44 pub ordering: Option<net::consumer::Ordering>,
46 pub shards: Option<Vec<u16>>,
48}
49
50#[derive(Debug, Clone)]
52pub struct PollResponse {
53 pub events: Vec<StoredEvent>,
55 pub next_id: Option<String>,
57 pub has_more: bool,
59}
60
61pub struct Net {
91 bus: Arc<EventBus>,
92}
93
94impl Net {
95 pub fn builder() -> NetBuilder {
97 NetBuilder::new()
98 }
99
100 pub fn from_bus(bus: EventBus) -> Self {
102 Self { bus: Arc::new(bus) }
103 }
104
105 pub(crate) async fn from_builder(builder: NetBuilder) -> Result<Self> {
107 let config = builder.build_config()?;
108 let bus = EventBus::new(config).await?;
109 Ok(Self { bus: Arc::new(bus) })
110 }
111
112 pub fn emit<T: Serialize>(&self, event: &T) -> Result<Receipt> {
118 let value = serde_json::to_value(event)?;
119 let e = Event::new(value);
120 let (shard_id, timestamp) = self.bus.ingest(e)?;
121 Ok(Receipt {
122 shard_id,
123 timestamp,
124 })
125 }
126
127 pub fn emit_raw(&self, bytes: impl Into<Bytes>) -> Result<Receipt> {
129 let raw = RawEvent::from_bytes(bytes);
130 let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
131 Ok(Receipt {
132 shard_id,
133 timestamp,
134 })
135 }
136
137 pub fn emit_str(&self, json: &str) -> Result<Receipt> {
139 let raw = RawEvent::from_str(json);
140 let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
141 Ok(Receipt {
142 shard_id,
143 timestamp,
144 })
145 }
146
147 pub fn emit_batch<T: Serialize>(&self, events: &[T]) -> Result<usize> {
149 let mut raw_events = Vec::with_capacity(events.len());
150 for event in events {
151 let value = serde_json::to_value(event)?;
152 raw_events.push(RawEvent::from_value(value));
153 }
154 Ok(self.bus.ingest_raw_batch(raw_events))
155 }
156
157 pub fn emit_raw_batch(&self, events: Vec<Bytes>) -> usize {
159 let raw_events: Vec<RawEvent> = events.into_iter().map(RawEvent::from_bytes).collect();
160 self.bus.ingest_raw_batch(raw_events)
161 }
162
163 pub async fn poll(&self, request: PollRequest) -> Result<PollResponse> {
167 let mut req = ConsumeRequest::new(request.limit);
168 if let Some(cursor) = request.cursor {
169 req = req.from(cursor);
170 }
171 if let Some(filter) = request.filter {
172 req = req.filter(filter);
173 }
174 if let Some(ordering) = request.ordering {
175 req = req.ordering(ordering);
176 }
177 if let Some(shards) = request.shards {
178 req = req.shards(shards);
179 }
180 let response = self.bus.poll(req).await?;
181 Ok(PollResponse {
182 events: response.events,
183 next_id: response.next_id,
184 has_more: response.has_more,
185 })
186 }
187
188 pub fn subscribe(&self, opts: SubscribeOpts) -> EventStream {
192 EventStream::new(self.bus.clone(), opts)
193 }
194
195 pub fn subscribe_typed<T: serde::de::DeserializeOwned>(
199 &self,
200 opts: SubscribeOpts,
201 ) -> TypedEventStream<T> {
202 TypedEventStream::new(self.bus.clone(), opts)
203 }
204
205 pub fn stats(&self) -> Stats {
209 let s = self.bus.stats();
210 Stats {
211 events_ingested: s.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
212 events_dropped: s.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
213 batches_dispatched: s
214 .batches_dispatched
215 .load(std::sync::atomic::Ordering::Relaxed),
216 }
217 }
218
219 pub fn shards(&self) -> u16 {
221 self.bus.num_shards()
222 }
223
224 pub async fn health(&self) -> bool {
226 self.bus.is_healthy().await
227 }
228
229 pub async fn flush(&self) -> Result<()> {
231 self.bus.flush().await?;
232 Ok(())
233 }
234
235 pub async fn shutdown(self) -> Result<()> {
245 self.bus.shutdown_via_ref().await?;
246 Ok(())
247 }
248
249 pub fn bus(&self) -> &EventBus {
251 &self.bus
252 }
253}