use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use bytes::Bytes;
use enigma_identity::LocalIdentity;
use enigma_packet::{Message, MessageMeta, MessageType};
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::attachment::{build_attachment_chunks, build_attachment_end, build_attachment_init};
use crate::error::{EnigmaProtocolError, Result};
use crate::session::{AttachmentUpdate, Session};
use crate::transport::Transport;
use crate::types::{AttachmentKind, ClientEvent, SessionBootstrap};
pub struct MessengerClient {
local: LocalIdentity,
transport: Arc<dyn Transport>,
sessions: HashMap<String, Session>,
events: VecDeque<ClientEvent>,
}
impl MessengerClient {
pub fn new(local: LocalIdentity, transport: Arc<dyn Transport>) -> Self {
Self {
local,
transport,
sessions: HashMap::new(),
events: VecDeque::new(),
}
}
pub fn ensure_session_with(&mut self, remote: &str, bootstrap: SessionBootstrap) -> Result<()> {
if self.sessions.contains_key(remote) {
return Ok(());
}
let local_user = self.local.user().username.clone();
let session = Session::new(&local_user, remote, &bootstrap)?;
self.sessions.insert(remote.to_string(), session);
Ok(())
}
pub async fn send_text(&mut self, remote: &str, text_utf8: &str) -> Result<Uuid> {
let session = self
.sessions
.get_mut(remote)
.ok_or(EnigmaProtocolError::UnknownSession)?;
let message = Message {
id: Uuid::new_v4(),
sender: session.local_id().to_string(),
receiver: session.remote_id().to_string(),
timestamp_ms: now_ms(),
msg_type: MessageType::Text,
payload: text_utf8.as_bytes().to_vec(),
meta: MessageMeta::Basic {
content_type: Some("text/plain".to_string()),
},
};
let ciphertext = session.encrypt_message(&message)?;
self.transport.send(Bytes::from(ciphertext)).await?;
Ok(message.id)
}
pub async fn send_attachment_bytes(
&mut self,
remote: &str,
kind: AttachmentKind,
filename: Option<&str>,
content_type: Option<&str>,
bytes: &[u8],
chunk_size: usize,
) -> Result<Uuid> {
if chunk_size == 0 {
return Err(EnigmaProtocolError::Attachment);
}
let transport = Arc::clone(&self.transport);
let session = self
.sessions
.get_mut(remote)
.ok_or(EnigmaProtocolError::UnknownSession)?;
let chunk_size_u32 =
u32::try_from(chunk_size).map_err(|_| EnigmaProtocolError::Attachment)?;
let total_size =
u64::try_from(bytes.len()).map_err(|_| EnigmaProtocolError::SizeLimitExceeded)?;
let chunk_count = ((bytes.len() + chunk_size - 1) / chunk_size) as u32;
let attachment_id = Uuid::new_v4();
let timestamp = now_ms();
let init = build_attachment_init(
session.local_id(),
session.remote_id(),
attachment_id,
kind,
filename,
content_type,
total_size,
chunk_size_u32,
chunk_count,
timestamp,
);
let mut ciphertext = session.encrypt_message(&init)?;
transport.send(Bytes::from(ciphertext)).await?;
let chunks = build_attachment_chunks(
session.local_id(),
session.remote_id(),
attachment_id,
bytes,
chunk_size,
timestamp,
);
for chunk in chunks {
ciphertext = session.encrypt_message(&chunk)?;
transport.send(Bytes::from(ciphertext)).await?;
}
let end_ts = timestamp.saturating_add(chunk_count as u64 + 1);
let end = build_attachment_end(
session.local_id(),
session.remote_id(),
attachment_id,
total_size,
chunk_count,
end_ts,
);
ciphertext = session.encrypt_message(&end)?;
transport.send(Bytes::from(ciphertext)).await?;
Ok(attachment_id)
}
pub async fn poll_once(&mut self) -> Result<Option<ClientEvent>> {
if let Some(event) = self.events.pop_front() {
return Ok(Some(event));
}
let bytes = self.transport.recv().await?;
let session_key = self.sole_session_key()?;
let (message, update, remote_label) = {
let session = self
.sessions
.get_mut(&session_key)
.ok_or(EnigmaProtocolError::InvalidState)?;
let msg = session.decrypt_packet(&bytes)?;
let attachment_update = session.handle_attachment_message(&msg)?;
let label = session.remote_id().to_string();
(msg, attachment_update, label)
};
self.events.push_back(ClientEvent::MessageReceived {
from: remote_label.clone(),
message: message.clone(),
});
if let Some(update) = update {
match update {
AttachmentUpdate::Init {
attachment_id,
total_chunks,
} => self.events.push_back(ClientEvent::AttachmentProgress {
from: remote_label.clone(),
attachment_id,
received_chunks: 0,
total_chunks,
}),
AttachmentUpdate::Chunk {
attachment_id,
received_chunks,
total_chunks,
} => self.events.push_back(ClientEvent::AttachmentProgress {
from: remote_label.clone(),
attachment_id,
received_chunks,
total_chunks,
}),
AttachmentUpdate::End {
attachment_id,
total_size,
} => self.events.push_back(ClientEvent::AttachmentCompleted {
from: remote_label.clone(),
attachment_id,
total_size,
}),
}
}
Ok(self.events.pop_front())
}
fn sole_session_key(&self) -> Result<String> {
if self.sessions.len() != 1 {
return Err(EnigmaProtocolError::InvalidState);
}
self.sessions
.keys()
.next()
.cloned()
.ok_or(EnigmaProtocolError::InvalidState)
}
pub async fn close(&self) -> Result<()> {
self.transport.close().await
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}