sim-lib-server 0.1.0

SIM workspace package for sim lib server.
Documentation
#[cfg(any(feature = "trigger-imap", feature = "trigger-smtp"))]
use std::io::{BufReader, Write};
#[cfg(feature = "trigger-webhook")]
use std::net::TcpListener;
use std::time::Duration;

use sim_kernel::{Cx, Error, Result};

use super::{TriggerSource, loopback_smtp_messages, queued_trigger_events};

mod common;
mod polling;

#[cfg(any(
    feature = "trigger-imap",
    feature = "trigger-smtp",
    feature = "trigger-telegram",
    feature = "trigger-matrix"
))]
use common::connect_with_timeout;
#[cfg(any(
    feature = "trigger-webhook",
    feature = "trigger-imap",
    feature = "trigger-smtp",
    feature = "trigger-telegram",
    feature = "trigger-matrix"
))]
use common::io_to_host;
#[cfg(any(feature = "trigger-imap", feature = "trigger-smtp"))]
use common::parse_host_port;
#[cfg(feature = "trigger-smtp")]
use common::{expect_smtp_code, smtp_command};
#[cfg(feature = "trigger-imap")]
use common::{
    first_search_result, imap_command_collect, imap_command_ok, read_imap_fetch_body,
    read_imap_line,
};

pub(crate) struct QueueSource {
    key: String,
}

impl QueueSource {
    pub(crate) fn new(key: String) -> Self {
        Self { key }
    }
}

impl TriggerSource for QueueSource {
    fn next_event(&mut self, _cx: &mut Cx, _timeout: Duration) -> Result<Option<Vec<u8>>> {
        let mut queues = queued_trigger_events()
            .lock()
            .map_err(|_| Error::PoisonedLock("trigger queue"))?;
        let Some(events) = queues.get_mut(&self.key) else {
            return Ok(None);
        };
        if events.is_empty() {
            queues.remove(&self.key);
            return Ok(None);
        }
        let event = events.remove(0);
        if events.is_empty() {
            queues.remove(&self.key);
        }
        Ok(Some(event))
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

pub(crate) struct LoopbackSmtpSource {
    key: String,
}

impl LoopbackSmtpSource {
    pub(crate) fn new(key: String) -> Self {
        Self { key }
    }
}

impl TriggerSource for LoopbackSmtpSource {
    fn next_event(&mut self, cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        let mut source = QueueSource::new(self.key.clone());
        let Some(event) = source.next_event(cx, timeout)? else {
            return Ok(None);
        };
        loopback_smtp_messages()
            .lock()
            .map_err(|_| Error::PoisonedLock("loopback smtp messages"))?
            .entry(self.key.clone())
            .or_default()
            .push(event.clone());
        Ok(Some(event))
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(feature = "trigger-smtp")]
pub(crate) struct SmtpSource {
    address: String,
    key: String,
    helo_name: String,
    from: Option<String>,
    to: Option<String>,
}

#[cfg(feature = "trigger-smtp")]
impl SmtpSource {
    pub(crate) fn new(
        address: String,
        key: String,
        helo_name: Option<String>,
        from: Option<String>,
        to: Option<String>,
    ) -> Self {
        Self {
            address,
            key,
            helo_name: helo_name.unwrap_or_else(|| "sim-server".to_owned()),
            from,
            to,
        }
    }
}

#[cfg(feature = "trigger-smtp")]
impl TriggerSource for SmtpSource {
    fn next_event(&mut self, _cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        let event = {
            let mut queues = queued_trigger_events()
                .lock()
                .map_err(|_| Error::PoisonedLock("trigger queue"))?;
            let Some(events) = queues.get_mut(&self.key) else {
                return Ok(None);
            };
            if events.is_empty() {
                queues.remove(&self.key);
                return Ok(None);
            }
            let event = events.remove(0);
            if events.is_empty() {
                queues.remove(&self.key);
            }
            event
        };
        let from = self
            .from
            .clone()
            .ok_or_else(|| Error::Eval("smtp trigger requires :from".to_owned()))?;
        let to = self
            .to
            .clone()
            .ok_or_else(|| Error::Eval("smtp trigger requires :to".to_owned()))?;
        let (host, port) = parse_host_port(&self.address, 25)?;
        let stream = connect_with_timeout(&host, port, timeout)?;
        stream.set_read_timeout(Some(timeout)).map_err(io_to_host)?;
        stream
            .set_write_timeout(Some(timeout))
            .map_err(io_to_host)?;
        let mut reader = BufReader::new(stream);
        expect_smtp_code(&mut reader, 220)?;
        smtp_command(&mut reader, &format!("EHLO {}\r\n", self.helo_name), 250)?;
        smtp_command(&mut reader, &format!("MAIL FROM:<{from}>\r\n"), 250)?;
        smtp_command(&mut reader, &format!("RCPT TO:<{to}>\r\n"), 250)?;
        smtp_command(&mut reader, "DATA\r\n", 354)?;
        reader.get_mut().write_all(&event).map_err(io_to_host)?;
        if !event.ends_with(b"\r\n") {
            reader.get_mut().write_all(b"\r\n").map_err(io_to_host)?;
        }
        reader.get_mut().write_all(b".\r\n").map_err(io_to_host)?;
        reader.get_mut().flush().map_err(io_to_host)?;
        expect_smtp_code(&mut reader, 250)?;
        let _ = smtp_command(&mut reader, "QUIT\r\n", 221);
        Ok(Some(event))
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(feature = "trigger-imap")]
pub(crate) struct ImapSource {
    address: String,
    mailbox: String,
    user: Option<String>,
    password: Option<String>,
    pending_seen: Option<u32>,
}

#[cfg(feature = "trigger-imap")]
impl ImapSource {
    pub(crate) fn new(
        address: String,
        mailbox: String,
        user: Option<String>,
        password: Option<String>,
    ) -> Self {
        Self {
            address,
            mailbox,
            user,
            password,
            pending_seen: None,
        }
    }
}

#[cfg(feature = "trigger-imap")]
impl TriggerSource for ImapSource {
    fn next_event(&mut self, _cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        let (host, port) = parse_host_port(&self.address, 143)?;
        let mut reader = BufReader::new(connect_with_timeout(&host, port, timeout)?);
        reader
            .get_mut()
            .set_read_timeout(Some(timeout))
            .map_err(io_to_host)?;
        reader
            .get_mut()
            .set_write_timeout(Some(timeout))
            .map_err(io_to_host)?;
        read_imap_line(&mut reader)?;
        let mut tag = 1u32;
        if let (Some(user), Some(password)) = (&self.user, &self.password) {
            imap_command_ok(&mut reader, &mut tag, &format!("LOGIN {user} {password}"))?;
        }
        imap_command_ok(&mut reader, &mut tag, &format!("SELECT {}", self.mailbox))?;
        let search = imap_command_collect(&mut reader, &mut tag, "SEARCH UNSEEN")?;
        let Some(message_id) = first_search_result(&search) else {
            let _ = imap_command_ok(&mut reader, &mut tag, "LOGOUT");
            return Ok(None);
        };
        let fetch_tag = format!("A{tag:04}");
        tag += 1;
        reader
            .get_mut()
            .write_all(format!("{fetch_tag} FETCH {message_id} BODY[]\r\n").as_bytes())
            .map_err(io_to_host)?;
        reader.get_mut().flush().map_err(io_to_host)?;
        let body = read_imap_fetch_body(&mut reader, &fetch_tag)?;
        self.pending_seen = Some(message_id);
        let _ = imap_command_ok(&mut reader, &mut tag, "LOGOUT");
        Ok(Some(body))
    }

    fn ack(&mut self, _cx: &mut Cx) -> Result<()> {
        let Some(message_id) = self.pending_seen.take() else {
            return Ok(());
        };
        let (host, port) = parse_host_port(&self.address, 143)?;
        let mut reader = BufReader::new(connect_with_timeout(&host, port, Duration::from_secs(1))?);
        read_imap_line(&mut reader)?;
        let mut tag = 1u32;
        if let (Some(user), Some(password)) = (&self.user, &self.password) {
            imap_command_ok(&mut reader, &mut tag, &format!("LOGIN {user} {password}"))?;
        }
        imap_command_ok(&mut reader, &mut tag, &format!("SELECT {}", self.mailbox))?;
        imap_command_ok(
            &mut reader,
            &mut tag,
            &format!("STORE {message_id} +FLAGS (\\Seen)"),
        )?;
        let _ = imap_command_ok(&mut reader, &mut tag, "LOGOUT");
        Ok(())
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(feature = "trigger-webhook")]
pub(crate) struct WebhookSource {
    listener: TcpListener,
    route: String,
}

#[cfg(feature = "trigger-webhook")]
impl WebhookSource {
    pub(crate) fn bind(route: String, port: u16) -> Result<Self> {
        #[cfg(feature = "server-net-http")]
        {
            let listener = TcpListener::bind(("127.0.0.1", port)).map_err(io_to_host)?;
            listener.set_nonblocking(true).map_err(io_to_host)?;
            Ok(Self { listener, route })
        }
        #[cfg(not(feature = "server-net-http"))]
        {
            let _ = (route, port);
            Err(Error::Eval(
                "webhook trigger requires feature server-net-http".to_owned(),
            ))
        }
    }

    #[cfg(test)]
    pub(crate) fn local_port(&self) -> Result<u16> {
        self.listener
            .local_addr()
            .map(|addr| addr.port())
            .map_err(io_to_host)
    }
}

#[cfg(feature = "trigger-webhook")]
impl TriggerSource for WebhookSource {
    fn next_event(&mut self, _cx: &mut Cx, timeout: Duration) -> Result<Option<Vec<u8>>> {
        #[cfg(feature = "server-net-http")]
        {
            self.listener.set_nonblocking(true).map_err(io_to_host)?;
            let deadline = std::time::Instant::now() + timeout;
            loop {
                match self.listener.accept() {
                    Ok((mut stream, _)) => {
                        stream.set_read_timeout(Some(timeout)).map_err(io_to_host)?;
                        let response = match crate::http::read_request(&mut stream)? {
                            Some(request)
                                if request.method == "POST" && request.path == self.route =>
                            {
                                let body = request.body;
                                crate::http::write_response(
                                    &mut stream,
                                    &crate::http::HttpResponse {
                                        status: 200,
                                        headers: Vec::new(),
                                        body: Vec::new(),
                                    },
                                )?;
                                return Ok(Some(body));
                            }
                            Some(request) if request.path != self.route => {
                                crate::http::HttpResponse {
                                    status: 404,
                                    headers: Vec::new(),
                                    body: b"not found".to_vec(),
                                }
                            }
                            Some(_) => crate::http::HttpResponse {
                                status: 405,
                                headers: Vec::new(),
                                body: b"method not allowed".to_vec(),
                            },
                            None => crate::http::HttpResponse {
                                status: 400,
                                headers: Vec::new(),
                                body: b"empty request".to_vec(),
                            },
                        };
                        crate::http::write_response(&mut stream, &response)?;
                    }
                    Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
                        if std::time::Instant::now() >= deadline {
                            return Ok(None);
                        }
                        std::thread::sleep(Duration::from_millis(10));
                    }
                    Err(error) => return Err(io_to_host(error)),
                }
            }
        }
        #[cfg(not(feature = "server-net-http"))]
        {
            let _ = timeout;
            Err(Error::Eval(
                "webhook trigger requires feature server-net-http".to_owned(),
            ))
        }
    }

    #[cfg(test)]
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }
}

#[cfg(feature = "trigger-matrix")]
pub(crate) use polling::MatrixSource;
#[cfg(feature = "trigger-telegram")]
pub(crate) use polling::TelegramSource;