sim-lib-server 0.1.0

SIM workspace package for sim lib server.
Documentation
#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
use std::collections::VecDeque;
#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
use std::time::Duration;

#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
use sim_kernel::{Cx, Error, Result};

#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
use super::TriggerSource;
#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
use super::common::{connect_with_timeout, io_to_host};

#[cfg(feature = "trigger-telegram")]
pub(crate) struct TelegramSource {
    client: JsonPollingSource,
    chat_id: String,
    bot: String,
    offset: i64,
}

#[cfg(feature = "trigger-telegram")]
impl TelegramSource {
    pub(crate) fn new(bot: String, chat_id: String, base_url: String) -> Result<Self> {
        Ok(Self {
            client: JsonPollingSource::new(base_url)?,
            chat_id,
            bot,
            offset: 0,
        })
    }
}

#[cfg(feature = "trigger-telegram")]
impl TriggerSource for TelegramSource {
    fn next_event(&mut self, _cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        if let Some(event) = self.client.pending.pop_front() {
            return Ok(Some(event));
        }
        let path = format!(
            "{}/bot{}/getUpdates?timeout={}&offset={}",
            self.client.base_path,
            self.bot,
            timeout.as_secs(),
            self.offset
        );
        let body = self.client.get(&path, timeout)?;
        let value: serde_json::Value =
            serde_json::from_slice(&body).map_err(|error| Error::HostError(error.to_string()))?;
        let mut max_update = self.offset;
        for item in value
            .get("result")
            .and_then(serde_json::Value::as_array)
            .into_iter()
            .flatten()
        {
            let update_id = item
                .get("update_id")
                .and_then(serde_json::Value::as_i64)
                .unwrap_or(max_update);
            max_update = max_update.max(update_id + 1);
            if item
                .pointer("/message/chat/id")
                .map(json_id_text)
                .as_deref()
                == Some(self.chat_id.as_str())
            {
                self.client.pending.push_back(
                    serde_json::to_vec(item)
                        .map_err(|error| Error::HostError(error.to_string()))?,
                );
            }
        }
        self.offset = max_update;
        Ok(self.client.pending.pop_front())
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(feature = "trigger-matrix")]
pub(crate) struct MatrixSource {
    client: JsonPollingSource,
    room_id: String,
    since: Option<String>,
}

#[cfg(feature = "trigger-matrix")]
impl MatrixSource {
    pub(crate) fn new(room_id: String, base_url: String) -> Result<Self> {
        Ok(Self {
            client: JsonPollingSource::new(base_url)?,
            room_id,
            since: None,
        })
    }
}

#[cfg(feature = "trigger-matrix")]
impl TriggerSource for MatrixSource {
    fn next_event(&mut self, _cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        if let Some(event) = self.client.pending.pop_front() {
            return Ok(Some(event));
        }
        let mut path = format!(
            "{}/sync?timeout={}",
            self.client.base_path,
            timeout.as_millis()
        );
        if let Some(since) = &self.since {
            path.push_str("&since=");
            path.push_str(since);
        }
        let body = self.client.get(&path, timeout)?;
        let value: serde_json::Value =
            serde_json::from_slice(&body).map_err(|error| Error::HostError(error.to_string()))?;
        self.since = value
            .get("next_batch")
            .and_then(serde_json::Value::as_str)
            .map(ToOwned::to_owned);
        if let Some(events) = value.pointer(&format!(
            "/rooms/join/{}/timeline/events",
            self.room_id.replace('/', "~1")
        )) && let Some(events) = events.as_array()
        {
            for item in events {
                self.client.pending.push_back(
                    serde_json::to_vec(item)
                        .map_err(|error| Error::HostError(error.to_string()))?,
                );
            }
        }
        Ok(self.client.pending.pop_front())
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
struct JsonPollingSource {
    host: String,
    port: u16,
    base_path: String,
    pending: VecDeque<Vec<u8>>,
}

#[cfg(any(feature = "trigger-telegram", feature = "trigger-matrix"))]
impl JsonPollingSource {
    fn new(base_url: String) -> Result<Self> {
        #[cfg(feature = "server-net-http")]
        {
            let parsed = crate::http::parse_url(&base_url, "http", "")?;
            Ok(Self {
                host: parsed.host,
                port: parsed.port,
                base_path: parsed.path.trim_end_matches('/').to_owned(),
                pending: VecDeque::new(),
            })
        }
        #[cfg(not(feature = "server-net-http"))]
        {
            let _ = base_url;
            Err(Error::Eval(
                "http polling triggers require feature server-net-http".to_owned(),
            ))
        }
    }

    fn get(&self, path: &str, timeout: Duration) -> Result<Vec<u8>> {
        #[cfg(feature = "server-net-http")]
        {
            let mut stream = connect_with_timeout(&self.host, self.port, timeout)?;
            stream.set_read_timeout(Some(timeout)).map_err(io_to_host)?;
            stream
                .set_write_timeout(Some(timeout))
                .map_err(io_to_host)?;
            crate::http::write_request(
                &mut stream,
                &crate::http::HttpRequest {
                    method: "GET".to_owned(),
                    path: path.to_owned(),
                    headers: vec![("Host".to_owned(), self.host.clone())],
                    body: Vec::new(),
                },
            )?;
            let response = crate::http::read_response(&mut stream)?;
            if response.status != 200 {
                return Err(Error::Eval(format!("http status {}", response.status)));
            }
            Ok(response.body)
        }
        #[cfg(not(feature = "server-net-http"))]
        {
            let _ = (path, timeout);
            Err(Error::Eval(
                "http polling triggers require feature server-net-http".to_owned(),
            ))
        }
    }
}

#[cfg(feature = "trigger-telegram")]
fn json_id_text(value: &serde_json::Value) -> String {
    match value {
        serde_json::Value::String(text) => text.clone(),
        serde_json::Value::Number(number) => number.to_string(),
        _ => String::new(),
    }
}