use std::sync::Arc;
use bytes::Bytes;
use serde::Serialize;
use net::event::RawEvent;
use net::{ConsumeRequest, Event, EventBus, StoredEvent};
use crate::config::NetBuilder;
use crate::error::Result;
use crate::stream::{EventStream, SubscribeOpts, TypedEventStream};
#[derive(Debug, Clone, Copy)]
pub struct Receipt {
pub shard_id: u16,
pub timestamp: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct Stats {
pub events_ingested: u64,
pub events_dropped: u64,
pub batches_dispatched: u64,
}
#[derive(Debug, Clone, Default)]
pub struct PollRequest {
pub limit: usize,
pub cursor: Option<String>,
pub filter: Option<net::Filter>,
pub ordering: Option<net::consumer::Ordering>,
pub shards: Option<Vec<u16>>,
}
#[derive(Debug, Clone)]
pub struct PollResponse {
pub events: Vec<StoredEvent>,
pub next_id: Option<String>,
pub has_more: bool,
}
pub struct Net {
bus: Arc<EventBus>,
}
impl Net {
pub fn builder() -> NetBuilder {
NetBuilder::new()
}
pub fn from_bus(bus: EventBus) -> Self {
Self { bus: Arc::new(bus) }
}
pub(crate) async fn from_builder(builder: NetBuilder) -> Result<Self> {
let config = builder.build_config()?;
let bus = EventBus::new(config).await?;
Ok(Self { bus: Arc::new(bus) })
}
pub fn emit<T: Serialize>(&self, event: &T) -> Result<Receipt> {
let value = serde_json::to_value(event)?;
let e = Event::new(value);
let (shard_id, timestamp) = self.bus.ingest(e)?;
Ok(Receipt {
shard_id,
timestamp,
})
}
pub fn emit_raw(&self, bytes: impl Into<Bytes>) -> Result<Receipt> {
let raw = RawEvent::from_bytes(bytes);
let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
Ok(Receipt {
shard_id,
timestamp,
})
}
pub fn emit_str(&self, json: &str) -> Result<Receipt> {
let raw = RawEvent::from_str(json);
let (shard_id, timestamp) = self.bus.ingest_raw(raw)?;
Ok(Receipt {
shard_id,
timestamp,
})
}
pub fn emit_batch<T: Serialize>(&self, events: &[T]) -> Result<usize> {
let mut raw_events = Vec::with_capacity(events.len());
for event in events {
let value = serde_json::to_value(event)?;
raw_events.push(RawEvent::from_value(value));
}
Ok(self.bus.ingest_raw_batch(raw_events))
}
pub fn emit_raw_batch(&self, events: Vec<Bytes>) -> usize {
let raw_events: Vec<RawEvent> = events.into_iter().map(RawEvent::from_bytes).collect();
self.bus.ingest_raw_batch(raw_events)
}
pub async fn poll(&self, request: PollRequest) -> Result<PollResponse> {
let mut req = ConsumeRequest::new(request.limit);
if let Some(cursor) = request.cursor {
req = req.from(cursor);
}
if let Some(filter) = request.filter {
req = req.filter(filter);
}
if let Some(ordering) = request.ordering {
req = req.ordering(ordering);
}
if let Some(shards) = request.shards {
req = req.shards(shards);
}
let response = self.bus.poll(req).await?;
Ok(PollResponse {
events: response.events,
next_id: response.next_id,
has_more: response.has_more,
})
}
pub fn subscribe(&self, opts: SubscribeOpts) -> EventStream {
EventStream::new(self.bus.clone(), opts)
}
pub fn subscribe_typed<T: serde::de::DeserializeOwned>(
&self,
opts: SubscribeOpts,
) -> TypedEventStream<T> {
TypedEventStream::new(self.bus.clone(), opts)
}
pub fn stats(&self) -> Stats {
let s = self.bus.stats();
Stats {
events_ingested: s.events_ingested.load(std::sync::atomic::Ordering::Relaxed),
events_dropped: s.events_dropped.load(std::sync::atomic::Ordering::Relaxed),
batches_dispatched: s
.batches_dispatched
.load(std::sync::atomic::Ordering::Relaxed),
}
}
pub fn shards(&self) -> u16 {
self.bus.num_shards()
}
pub async fn health(&self) -> bool {
self.bus.is_healthy().await
}
pub async fn flush(&self) -> Result<()> {
self.bus.flush().await?;
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
self.bus.shutdown_via_ref().await?;
Ok(())
}
pub fn bus(&self) -> &EventBus {
&self.bus
}
}