use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use crate::error::{Error, Result};
use crate::communication::CommunicationChannel;
use crate::utils::logging;
pub struct MessageChannel {
name: String,
host_to_guest: Mutex<VecDeque<Vec<u8>>>,
guest_to_host: Mutex<VecDeque<Vec<u8>>>,
capacity: usize,
closed: Mutex<bool>,
}
impl MessageChannel {
pub fn new(name: &str, capacity: usize) -> Self {
Self {
name: name.to_string(),
host_to_guest: Mutex::new(VecDeque::with_capacity(capacity)),
guest_to_host: Mutex::new(VecDeque::with_capacity(capacity)),
capacity,
closed: Mutex::new(false),
}
}
pub fn get_guest_interface(&self) -> GuestChannelInterface {
GuestChannelInterface {
name: self.name.clone(),
host_to_guest: self.host_to_guest.lock().unwrap().clone(),
guest_to_host: self.guest_to_host.lock().unwrap().clone(),
capacity: self.capacity,
closed: *self.closed.lock().unwrap(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl CommunicationChannel for MessageChannel {
fn send_to_guest(&self, message: &[u8]) -> Result<()> {
if *self.closed.lock().unwrap() {
return Err(Error::Communication {
channel: "host_to_guest".to_string(),
reason: "Channel is closed".to_string(),
instance_id: None,
});
}
let mut queue = self.host_to_guest.lock().unwrap();
if queue.len() >= self.capacity {
return Err(Error::Communication {
channel: "host_to_guest".to_string(),
reason: "Channel is full".to_string(),
instance_id: None,
});
}
queue.push_back(message.to_vec());
logging::log_communication_event(&self.name, "sent", message.len());
Ok(())
}
fn receive_from_guest(&self) -> Result<Vec<u8>> {
if *self.closed.lock().unwrap() {
return Err(Error::Communication {
channel: "guest_to_host".to_string(),
reason: "Channel is closed".to_string(),
instance_id: None,
});
}
let mut queue = self.guest_to_host.lock().unwrap();
if let Some(message) = queue.pop_front() {
logging::log_communication_event(&self.name, "received", message.len());
Ok(message)
} else {
Err(Error::Communication {
channel: "guest_to_host".to_string(),
reason: "No messages available".to_string(),
instance_id: None,
})
}
}
fn has_messages(&self) -> bool {
if *self.closed.lock().unwrap() {
return false;
}
let queue = self.guest_to_host.lock().unwrap();
!queue.is_empty()
}
fn close(&self) -> Result<()> {
let mut closed = self.closed.lock().unwrap();
*closed = true;
logging::log_communication_event(&self.name, "closed", 0);
Ok(())
}
}
pub struct GuestChannelInterface {
name: String,
#[allow(dead_code)]
host_to_guest: VecDeque<Vec<u8>>,
#[allow(dead_code)]
guest_to_host: VecDeque<Vec<u8>>,
#[allow(dead_code)]
capacity: usize,
closed: bool,
}
impl GuestChannelInterface {
pub fn send(&self, _message: &[u8]) -> Result<()> {
if self.closed {
return Err(Error::Communication {
channel: "guest_to_host".to_string(),
reason: "Channel is closed".to_string(),
instance_id: None,
});
}
Ok(())
}
pub fn receive(&self) -> Result<Vec<u8>> {
if self.closed {
return Err(Error::Communication {
channel: "host_to_guest".to_string(),
reason: "Channel is closed".to_string(),
instance_id: None,
});
}
Err(Error::Communication {
channel: "host_to_guest".to_string(),
reason: "No messages available".to_string(),
instance_id: None,
})
}
pub fn has_messages(&self) -> bool {
if self.closed {
return false;
}
false
}
pub fn close(&mut self) -> Result<()> {
self.closed = true;
Ok(())
}
pub fn name(&self) -> &str {
&self.name
}
}
pub struct ChannelFactory {
default_capacity: usize,
}
impl ChannelFactory {
pub fn new(default_capacity: usize) -> Self {
Self {
default_capacity,
}
}
pub fn create_channel(&self, name: &str) -> Arc<MessageChannel> {
Arc::new(MessageChannel::new(name, self.default_capacity))
}
pub fn create_channel_with_capacity(&self, name: &str, capacity: usize) -> Arc<MessageChannel> {
Arc::new(MessageChannel::new(name, capacity))
}
}
impl Default for ChannelFactory {
fn default() -> Self {
Self {
default_capacity: 100,
}
}
}