teamtalk 6.0.0

TeamTalk SDK for Rust
Documentation
use crate::events::Event;
use crate::types::{ClientId, CommandId};
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc::{self, Receiver, Sender};
use std::time::{Duration, Instant, SystemTime};

#[derive(Debug, Clone)]
pub struct ClientEvent {
    pub client_id: ClientId,
    pub label: Option<String>,
    pub event: Event,
    pub command_id: Option<CommandId>,
    pub at: SystemTime,
}

#[derive(Debug, Clone, Default)]
pub struct ClientHealth {
    pub last_event: Option<Event>,
    pub last_event_at: Option<SystemTime>,
    pub last_poll_at: Option<SystemTime>,
}

pub struct ClientManager {
    clients: Vec<crate::client::Client>,
    health: HashMap<ClientId, ClientHealth>,
    queue: VecDeque<usize>,
    poll_timeout_ms: i32,
    tick_sleep: Duration,
    tx: Sender<ClientEvent>,
    rx: Receiver<ClientEvent>,
}

impl Default for ClientManager {
    fn default() -> Self {
        Self::new()
    }
}

impl ClientManager {
    pub fn new() -> Self {
        let (tx, rx) = mpsc::channel();
        Self {
            clients: Vec::new(),
            health: std::collections::HashMap::new(),
            queue: VecDeque::new(),
            poll_timeout_ms: 0,
            tick_sleep: Duration::from_millis(1),
            tx,
            rx,
        }
    }

    pub fn add_client(&mut self, client: crate::client::Client) {
        let id = client.id();
        self.health.insert(id, ClientHealth::default());
        self.queue.push_back(self.clients.len());
        self.clients.push(client);
    }

    pub fn remove_client(&mut self, id: ClientId) {
        if let Some(pos) = self.clients.iter().position(|c| c.id() == id) {
            self.clients.remove(pos);
            self.health.remove(&id);
            self.queue = self
                .queue
                .iter()
                .filter_map(|&idx| {
                    if idx == pos {
                        None
                    } else if idx > pos {
                        Some(idx - 1)
                    } else {
                        Some(idx)
                    }
                })
                .collect();
        }
    }

    pub fn set_poll_timeout(&mut self, timeout_ms: i32) {
        self.poll_timeout_ms = timeout_ms;
    }

    pub fn set_tick_sleep(&mut self, sleep: Duration) {
        self.tick_sleep = sleep;
    }

    pub fn events(&self) -> &Receiver<ClientEvent> {
        &self.rx
    }

    pub fn health_snapshot(&self, id: ClientId) -> Option<ClientHealth> {
        self.health.get(&id).cloned()
    }

    pub fn run_once(&mut self) {
        let now = SystemTime::now();
        let mut processed = 0;
        let queue_len = self.queue.len();
        for _ in 0..queue_len {
            if let Some(idx) = self.queue.pop_front() {
                if let Some(client) = self.clients.get(idx) {
                    if let Some((event, msg)) = client.poll(self.poll_timeout_ms) {
                        let command_id = match event {
                            Event::CmdProcessing | Event::CmdError | Event::CmdSuccess => {
                                Some(CommandId(msg.source()))
                            }
                            _ => None,
                        };
                        let evt = ClientEvent {
                            client_id: client.id(),
                            label: client.label(),
                            event,
                            command_id,
                            at: now,
                        };
                        let _ = self.tx.send(evt.clone());
                        let entry = self.health.entry(client.id()).or_default();
                        entry.last_event = Some(event);
                        entry.last_event_at = Some(now);
                    }
                    let entry = self.health.entry(client.id()).or_default();
                    entry.last_poll_at = Some(now);
                }
                self.queue.push_back(idx);
                processed += 1;
            }
        }
        if processed == 0 {
            std::thread::sleep(self.tick_sleep);
        }
    }

    pub fn run(&mut self) -> ! {
        loop {
            let start = Instant::now();
            self.run_once();
            if self.tick_sleep > Duration::ZERO {
                let elapsed = start.elapsed();
                if elapsed < self.tick_sleep {
                    std::thread::sleep(self.tick_sleep - elapsed);
                }
            }
        }
    }

    pub fn wait_cmd(
        &mut self,
        client_id: ClientId,
        cmd_id: CommandId,
        timeout_ms: i32,
    ) -> Result<Event, WaitError> {
        let deadline = if timeout_ms < 0 {
            None
        } else {
            Some(Instant::now() + Duration::from_millis(timeout_ms as u64))
        };
        loop {
            self.run_once();
            while let Ok(evt) = self.rx.try_recv() {
                if evt.client_id == client_id
                    && evt.command_id == Some(cmd_id)
                    && matches!(evt.event, Event::CmdSuccess | Event::CmdError)
                {
                    return match evt.event {
                        Event::CmdSuccess => Ok(evt.event),
                        Event::CmdError => Err(WaitError::CommandFailed),
                        _ => unreachable!(),
                    };
                }
            }
            if let Some(deadline) = deadline
                && Instant::now() >= deadline
            {
                return Err(WaitError::Timeout);
            }
        }
    }

    pub fn wait_cmd_ok(
        &mut self,
        client_id: ClientId,
        cmd_id: CommandId,
        timeout_ms: i32,
    ) -> Result<(), WaitError> {
        self.wait_cmd(client_id, cmd_id, timeout_ms).map(|_| ())
    }

    pub fn wait_cmd_any(
        &mut self,
        cmd_id: CommandId,
        timeout_ms: i32,
    ) -> Result<ClientId, WaitError> {
        let deadline = if timeout_ms < 0 {
            None
        } else {
            Some(Instant::now() + Duration::from_millis(timeout_ms as u64))
        };
        loop {
            self.run_once();
            while let Ok(evt) = self.rx.try_recv() {
                if evt.command_id == Some(cmd_id)
                    && matches!(evt.event, Event::CmdSuccess | Event::CmdError)
                {
                    return match evt.event {
                        Event::CmdSuccess => Ok(evt.client_id),
                        Event::CmdError => Err(WaitError::CommandFailed),
                        _ => unreachable!(),
                    };
                }
            }
            if let Some(deadline) = deadline
                && Instant::now() >= deadline
            {
                return Err(WaitError::Timeout);
            }
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitError {
    Timeout,
    CommandFailed,
}