enigma-protocol 0.1.0

High-level orchestrator that composes the Enigma crates into a production-ready messaging protocol
Documentation
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use bytes::Bytes;
use enigma_identity::LocalIdentity;
use enigma_packet::{Message, MessageMeta, MessageType};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;

use crate::attachment::{build_attachment_chunks, build_attachment_end, build_attachment_init};
use crate::error::{EnigmaProtocolError, Result};
use crate::session::{AttachmentUpdate, Session};
use crate::transport::Transport;
use crate::types::{AttachmentKind, ClientEvent, SessionBootstrap};

pub struct MessengerClient {
    local: LocalIdentity,
    transport: Arc<dyn Transport>,
    sessions: HashMap<String, Session>,
    events: VecDeque<ClientEvent>,
}

impl MessengerClient {
    pub fn new(local: LocalIdentity, transport: Arc<dyn Transport>) -> Self {
        Self {
            local,
            transport,
            sessions: HashMap::new(),
            events: VecDeque::new(),
        }
    }

    pub fn ensure_session_with(&mut self, remote: &str, bootstrap: SessionBootstrap) -> Result<()> {
        if self.sessions.contains_key(remote) {
            return Ok(());
        }
        let local_user = self.local.user().username.clone();
        let session = Session::new(&local_user, remote, &bootstrap)?;
        self.sessions.insert(remote.to_string(), session);
        Ok(())
    }

    pub async fn send_text(&mut self, remote: &str, text_utf8: &str) -> Result<Uuid> {
        let session = self
            .sessions
            .get_mut(remote)
            .ok_or(EnigmaProtocolError::UnknownSession)?;
        let message = Message {
            id: Uuid::new_v4(),
            sender: session.local_id().to_string(),
            receiver: session.remote_id().to_string(),
            timestamp_ms: now_ms(),
            msg_type: MessageType::Text,
            payload: text_utf8.as_bytes().to_vec(),
            meta: MessageMeta::Basic {
                content_type: Some("text/plain".to_string()),
            },
        };
        let ciphertext = session.encrypt_message(&message)?;
        self.transport.send(Bytes::from(ciphertext)).await?;
        Ok(message.id)
    }

    pub async fn send_attachment_bytes(
        &mut self,
        remote: &str,
        kind: AttachmentKind,
        filename: Option<&str>,
        content_type: Option<&str>,
        bytes: &[u8],
        chunk_size: usize,
    ) -> Result<Uuid> {
        if chunk_size == 0 {
            return Err(EnigmaProtocolError::Attachment);
        }
        let transport = Arc::clone(&self.transport);
        let session = self
            .sessions
            .get_mut(remote)
            .ok_or(EnigmaProtocolError::UnknownSession)?;
        let chunk_size_u32 =
            u32::try_from(chunk_size).map_err(|_| EnigmaProtocolError::Attachment)?;
        let total_size =
            u64::try_from(bytes.len()).map_err(|_| EnigmaProtocolError::SizeLimitExceeded)?;
        let chunk_count = ((bytes.len() + chunk_size - 1) / chunk_size) as u32;
        let attachment_id = Uuid::new_v4();
        let timestamp = now_ms();
        let init = build_attachment_init(
            session.local_id(),
            session.remote_id(),
            attachment_id,
            kind,
            filename,
            content_type,
            total_size,
            chunk_size_u32,
            chunk_count,
            timestamp,
        );
        let mut ciphertext = session.encrypt_message(&init)?;
        transport.send(Bytes::from(ciphertext)).await?;
        let chunks = build_attachment_chunks(
            session.local_id(),
            session.remote_id(),
            attachment_id,
            bytes,
            chunk_size,
            timestamp,
        );
        for chunk in chunks {
            ciphertext = session.encrypt_message(&chunk)?;
            transport.send(Bytes::from(ciphertext)).await?;
        }
        let end_ts = timestamp.saturating_add(chunk_count as u64 + 1);
        let end = build_attachment_end(
            session.local_id(),
            session.remote_id(),
            attachment_id,
            total_size,
            chunk_count,
            end_ts,
        );
        ciphertext = session.encrypt_message(&end)?;
        transport.send(Bytes::from(ciphertext)).await?;
        Ok(attachment_id)
    }

    pub async fn poll_once(&mut self) -> Result<Option<ClientEvent>> {
        if let Some(event) = self.events.pop_front() {
            return Ok(Some(event));
        }
        let bytes = self.transport.recv().await?;
        let session_key = self.sole_session_key()?;
        let (message, update, remote_label) = {
            let session = self
                .sessions
                .get_mut(&session_key)
                .ok_or(EnigmaProtocolError::InvalidState)?;
            let msg = session.decrypt_packet(&bytes)?;
            let attachment_update = session.handle_attachment_message(&msg)?;
            let label = session.remote_id().to_string();
            (msg, attachment_update, label)
        };
        self.events.push_back(ClientEvent::MessageReceived {
            from: remote_label.clone(),
            message: message.clone(),
        });
        if let Some(update) = update {
            match update {
                AttachmentUpdate::Init {
                    attachment_id,
                    total_chunks,
                } => self.events.push_back(ClientEvent::AttachmentProgress {
                    from: remote_label.clone(),
                    attachment_id,
                    received_chunks: 0,
                    total_chunks,
                }),
                AttachmentUpdate::Chunk {
                    attachment_id,
                    received_chunks,
                    total_chunks,
                } => self.events.push_back(ClientEvent::AttachmentProgress {
                    from: remote_label.clone(),
                    attachment_id,
                    received_chunks,
                    total_chunks,
                }),
                AttachmentUpdate::End {
                    attachment_id,
                    total_size,
                } => self.events.push_back(ClientEvent::AttachmentCompleted {
                    from: remote_label.clone(),
                    attachment_id,
                    total_size,
                }),
            }
        }
        Ok(self.events.pop_front())
    }

    fn sole_session_key(&self) -> Result<String> {
        if self.sessions.len() != 1 {
            return Err(EnigmaProtocolError::InvalidState);
        }
        self.sessions
            .keys()
            .next()
            .cloned()
            .ok_or(EnigmaProtocolError::InvalidState)
    }

    pub async fn close(&self) -> Result<()> {
        self.transport.close().await
    }
}

fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis() as u64)
        .unwrap_or(0)
}