use crate::common::runtime::SharedDummyRuntime;
use ::crossbeam_channel::{
Receiver,
Sender,
};
use ::demikernel::{
demi_sgarray_t,
demikernel::{
config::Config,
libos::network::libos::SharedNetworkLibOS,
},
inetstack::{
protocols::MAX_HEADER_SIZE,
SharedInetStack,
},
runtime::{
fail::Fail,
logging,
memory::{
DemiBuffer,
MemoryRuntime,
},
QDesc,
QToken,
SharedDemiRuntime,
},
OperationResult,
};
use ::std::{
ops::{
Deref,
DerefMut,
},
time::{
Duration,
Instant,
},
};
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1);
pub struct DummyLibOS(SharedNetworkLibOS<SharedInetStack>);
impl DummyLibOS {
pub fn new_test(config_path: &str, tx: Sender<DemiBuffer>, rx: Receiver<DemiBuffer>) -> Result<Self, Fail> {
let config: Config = Config::new(config_path.to_string())?;
let runtime: SharedDemiRuntime = SharedDemiRuntime::default();
let network: SharedDummyRuntime = SharedDummyRuntime::new(rx, tx);
logging::initialize();
let transport = SharedInetStack::new_test(&config, runtime.clone(), network)?;
Ok(Self(SharedNetworkLibOS::<SharedInetStack>::new(
config.local_ipv4_addr()?,
runtime,
transport,
)))
}
pub fn cook_data(&self, size: usize) -> Result<demi_sgarray_t, Fail> {
let fill_char: u8 = b'a';
let mut buf: DemiBuffer = DemiBuffer::new_with_headroom(size as u16, MAX_HEADER_SIZE as u16);
for a in &mut buf[..] {
*a = fill_char;
}
let data: demi_sgarray_t = self.get_transport().into_sgarray(buf)?;
Ok(data)
}
#[allow(dead_code)]
pub fn wait(&mut self, qt: QToken, timeout: Option<Duration>) -> Result<(QDesc, OperationResult), Fail> {
if let Some(result) = self.get_runtime().get_completed_task(&qt) {
return Ok(result);
}
let qt_array: [QToken; 1] = [qt];
let mut prev: Instant = Instant::now();
let mut remaining_time: Duration = timeout.unwrap_or(DEFAULT_TIMEOUT);
loop {
if let Some((offset, qd, qr)) = self.get_runtime().run_any(&qt_array, remaining_time) {
debug_assert_eq!(offset, 0);
return Ok((qd, qr));
}
let now: Instant = Instant::now();
let elapsed_time: Duration = now - prev;
if elapsed_time >= remaining_time {
break;
} else {
remaining_time = remaining_time - elapsed_time;
prev = now;
}
}
Err(Fail::new(libc::ETIMEDOUT, "wait timed out"))
}
}
impl Deref for DummyLibOS {
type Target = SharedNetworkLibOS<SharedInetStack>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for DummyLibOS {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}