use std::{
any::Any,
fmt::Debug,
io::{Read, Write},
sync::Arc,
};
use lunatic_networking_api::{TcpConnection, TlsConnection};
use tokio::net::UdpSocket;
use crate::runtimes::wasmtime::WasmtimeCompiledModule;
pub type Resource = dyn Any + Send + Sync;
#[derive(Debug)]
pub enum Message {
Data(DataMessage),
LinkDied(Option<i64>),
ProcessDied(u64),
}
impl Message {
pub fn tag(&self) -> Option<i64> {
match self {
Message::Data(message) => message.tag,
Message::LinkDied(tag) => *tag,
Message::ProcessDied(_) => None,
}
}
pub fn process_id(&self) -> Option<u64> {
match self {
Message::Data(_) => None,
Message::LinkDied(_) => None,
Message::ProcessDied(process_id) => Some(*process_id),
}
}
#[cfg(feature = "metrics")]
pub fn write_metrics(&self) {
match self {
Message::Data(message) => message.write_metrics(),
Message::LinkDied(_) => {
metrics::increment_counter!("lunatic.process.messages.link_died.count");
}
Message::ProcessDied(_) => {}
}
}
}
#[derive(Debug, Default)]
pub struct DataMessage {
pub tag: Option<i64>,
pub read_ptr: usize,
pub buffer: Vec<u8>,
pub resources: Vec<Option<Arc<Resource>>>,
}
impl DataMessage {
pub fn new(tag: Option<i64>, buffer_capacity: usize) -> Self {
Self {
tag,
read_ptr: 0,
buffer: Vec::with_capacity(buffer_capacity),
resources: Vec::new(),
}
}
pub fn new_from_vec(tag: Option<i64>, buffer: Vec<u8>) -> Self {
Self {
tag,
read_ptr: 0,
buffer,
resources: Vec::new(),
}
}
pub fn add_resource(&mut self, resource: Arc<Resource>) -> usize {
self.resources.push(Some(resource));
self.resources.len() - 1
}
pub fn take_module<T: 'static>(
&mut self,
index: usize,
) -> Option<Arc<WasmtimeCompiledModule<T>>> {
self.take_downcast(index)
}
pub fn take_tcp_stream(&mut self, index: usize) -> Option<Arc<TcpConnection>> {
self.take_downcast(index)
}
pub fn take_udp_socket(&mut self, index: usize) -> Option<Arc<UdpSocket>> {
self.take_downcast(index)
}
pub fn take_tls_stream(&mut self, index: usize) -> Option<Arc<TlsConnection>> {
self.take_downcast(index)
}
pub fn seek(&mut self, index: usize) {
self.read_ptr = index;
}
pub fn size(&self) -> usize {
self.buffer.len()
}
#[cfg(feature = "metrics")]
pub fn write_metrics(&self) {
metrics::increment_counter!("lunatic.process.messages.data.count");
metrics::histogram!(
"lunatic.process.messages.data.resources.count",
self.resources.len() as f64
);
metrics::histogram!("lunatic.process.messages.data.size", self.size() as f64);
}
fn take_downcast<T: Send + Sync + 'static>(&mut self, index: usize) -> Option<Arc<T>> {
let resource = self.resources.get_mut(index);
match resource {
Some(resource_ref) => {
let resource_any = std::mem::take(resource_ref).map(|resource| resource.downcast());
match resource_any {
Some(Ok(resource)) => Some(resource),
Some(Err(resource)) => {
*resource_ref = Some(resource);
None
}
None => None,
}
}
None => None,
}
}
}
impl Write for DataMessage {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.buffer.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Read for DataMessage {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
let slice = if let Some(slice) = self.buffer.get(self.read_ptr..) {
slice
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::OutOfMemory,
"Reading outside message buffer",
));
};
let bytes = buf.write(slice)?;
self.read_ptr += bytes;
Ok(bytes)
}
}