use futures::future;
use hyper::body::{Bytes, Sender};
use std::mem;
use std::fmt::{self, Display, Formatter};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EventBuilder<'data, 'id, 'event> {
pub data: &'data str,
pub id: Option<&'id str>,
pub event_type: Option<&'event str>,
}
impl<'data, 'id, 'event> EventBuilder<'data, 'id, 'event> {
pub fn new(data: &'data str) -> Self {
Self {
data,
id: None,
event_type: None,
}
}
pub fn data(mut self, data: &'data str) -> Self {
self.data = data;
self
}
pub fn id(mut self, id: &'id str) -> Self {
self.id = Some(id);
self
}
pub fn event_type(mut self, event_type: &'event str) -> Self {
self.event_type = Some(event_type);
self
}
pub fn clear_id(mut self) -> Self {
self.id = None;
self
}
pub fn clear_type(mut self) -> Self {
self.event_type = None;
self
}
pub fn build(self) -> String {
let mut event = String::with_capacity(
self.id.map(|id| 5 + id.len()).unwrap_or(0) +
self.event_type.map(|event| 8 + event.len()).unwrap_or(0) +
self.data.lines().count()*6 + self.data.len() +
1
);
if let Some(id) = self.id {
event.push_str("id: ");
event.push_str(id);
event.push('\n');
}
if let Some(event_type) = self.event_type {
event.push_str("event: ");
event.push_str(event_type);
event.push('\n');
}
for line in self.data.lines() {
event.push_str("data: ");
event.push_str(line);
event.push('\n');
}
event.push('\n');
event
}
}
impl<'data, 'id, 'event> Display for EventBuilder<'data, 'id, 'event> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.write_str(&self.build())
}
}
impl<'data, 'id, 'event> Into<Bytes> for EventBuilder<'data, 'id, 'event> {
fn into(self) -> Bytes {
self.build().into()
}
}
#[derive(Debug, Default)]
pub struct Server {
clients: Vec<Sender>,
}
impl Server {
pub fn new() -> Self {
Server {
clients: Vec::new(),
}
}
pub fn add_client(&mut self, client: Sender) {
self.clients.push(client);
}
pub async fn send_to_clients<B: Into<Bytes>>(&mut self, text: B) -> usize {
let bytes = text.into();
let mut sent = future::join_all(self.clients.iter_mut().map(|client| {
let bytes = bytes.slice(..);
async move { client.send_data(bytes).await.is_ok() }
})).await.into_iter();
self.clients.retain(|_| sent.next().unwrap());
self.clients.len()
}
pub async fn send_heartbeat(&mut self) -> usize {
self.send_to_clients(":\n\n").await
}
pub fn disconnect_all(&mut self) {
for client in mem::replace(&mut self.clients, Vec::new()) {
client.abort();
}
}
pub fn connections(&self) -> usize {
self.clients.len()
}
}