use crate::{
demi_sgarray_t,
demikernel::{
config::Config,
libos::network::libos::SharedNetworkLibOS,
},
inetstack::{
test_helpers::SharedTestPhysicalLayer,
SharedInetStack,
},
runtime::{
fail::Fail,
memory::{
DemiBuffer,
MemoryRuntime,
},
network::types::MacAddress,
OperationResult,
QDesc,
QToken,
SharedDemiRuntime,
SharedObject,
},
};
use ::socket2::{
Domain,
Protocol,
Type,
};
use ::std::{
collections::{
HashMap,
VecDeque,
},
net::{
Ipv4Addr,
SocketAddrV4,
},
ops::{
Deref,
DerefMut,
},
time::{
Duration,
Instant,
},
};
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Clone)]
pub struct Engine {
libos: SharedNetworkLibOS<SharedInetStack>,
layer1_endpoint: SharedTestPhysicalLayer,
}
pub struct SharedEngine(SharedObject<Engine>);
impl SharedEngine {
pub fn new(config_path: &str, layer1_endpoint: SharedTestPhysicalLayer, now: Instant) -> Result<Self, Fail> {
let config: Config = Config::new(config_path.to_string())?;
let runtime: SharedDemiRuntime = SharedDemiRuntime::new(now);
let transport: SharedInetStack = SharedInetStack::new_test(&config, runtime.clone(), layer1_endpoint.clone())?;
Ok(Self(SharedObject::new(Engine {
libos: SharedNetworkLibOS::<SharedInetStack>::new(config.local_ipv4_addr()?, runtime, transport),
layer1_endpoint,
})))
}
pub fn pop_frame(&mut self) -> DemiBuffer {
self.layer1_endpoint.pop_frame()
}
pub fn pop_all_frames(&mut self) -> VecDeque<DemiBuffer> {
self.layer1_endpoint.pop_all_frames()
}
pub fn advance_clock(&mut self, now: Instant) {
self.libos.get_runtime().advance_clock(now)
}
pub fn push_frame(&mut self, bytes: DemiBuffer) {
self.layer1_endpoint.push_frame(bytes);
self.libos.get_runtime().poll();
self.libos.get_runtime().poll();
}
pub async fn ipv4_ping(&mut self, dest_ipv4_addr: Ipv4Addr, timeout: Option<Duration>) -> Result<Duration, Fail> {
self.libos.get_transport().ping(dest_ipv4_addr, timeout).await
}
pub fn udp_pushto(&mut self, qd: QDesc, buf: DemiBuffer, to: SocketAddrV4) -> Result<QToken, Fail> {
let data: demi_sgarray_t = self.libos.get_transport().into_sgarray(buf)?;
self.libos.pushto(qd, &data, to.into())
}
pub fn udp_pop(&mut self, qd: QDesc) -> Result<QToken, Fail> {
self.libos.pop(qd, None)
}
pub fn udp_socket(&mut self) -> Result<QDesc, Fail> {
self.libos.socket(Domain::IPV4, Type::DGRAM, Protocol::UDP)
}
pub fn udp_bind(&mut self, socket_fd: QDesc, endpoint: SocketAddrV4) -> Result<(), Fail> {
self.libos.bind(socket_fd, endpoint.into())
}
pub fn udp_close(&mut self, socket_fd: QDesc) -> Result<(), Fail> {
let qt = self.libos.async_close(socket_fd)?;
match self.wait(qt, DEFAULT_TIMEOUT)? {
(_, OperationResult::Close) => Ok(()),
_ => unreachable!("close did not succeed"),
}
}
pub fn tcp_socket(&mut self) -> Result<QDesc, Fail> {
self.libos.socket(Domain::IPV4, Type::STREAM, Protocol::TCP)
}
pub fn tcp_connect(&mut self, socket_fd: QDesc, remote_endpoint: SocketAddrV4) -> Result<QToken, Fail> {
self.libos.connect(socket_fd, remote_endpoint.into())
}
pub fn tcp_bind(&mut self, socket_fd: QDesc, endpoint: SocketAddrV4) -> Result<(), Fail> {
self.libos.bind(socket_fd, endpoint.into())
}
pub fn tcp_accept(&mut self, fd: QDesc) -> Result<QToken, Fail> {
self.libos.accept(fd)
}
pub fn tcp_push(&mut self, socket_fd: QDesc, buf: DemiBuffer) -> Result<QToken, Fail> {
let data: demi_sgarray_t = self.libos.get_transport().into_sgarray(buf)?;
self.libos.push(socket_fd, &data)
}
pub fn tcp_pop(&mut self, socket_fd: QDesc) -> Result<QToken, Fail> {
self.libos.pop(socket_fd, None)
}
pub fn tcp_async_close(&mut self, socket_fd: QDesc) -> Result<QToken, Fail> {
self.libos.async_close(socket_fd)
}
pub fn tcp_listen(&mut self, socket_fd: QDesc, backlog: usize) -> Result<(), Fail> {
self.libos.listen(socket_fd, backlog)
}
pub async fn arp_query(self, ipv4_addr: Ipv4Addr) -> Result<MacAddress, Fail> {
self.libos.get_transport().arp_query(ipv4_addr).await
}
pub fn export_arp_cache(&self) -> HashMap<Ipv4Addr, MacAddress> {
self.libos.get_transport().export_arp_cache()
}
pub fn poll(&self) {
self.libos.get_runtime().poll()
}
pub fn wait(&self, qt: QToken, timeout: Duration) -> Result<(QDesc, OperationResult), Fail> {
if let Some(result) = self.libos.get_runtime().get_completed_task(&qt) {
return Ok(result);
}
let qt_array: [QToken; 1] = [qt];
let mut prev: Instant = Instant::now();
let mut remaining_time: Duration = timeout;
loop {
if let Some((offset, qd, qr)) = self.libos.get_runtime().run_any(&qt_array, remaining_time) {
debug_assert_eq!(offset, 0);
return Ok((qd, qr));
}
let now: Instant = Instant::now();
let elapsed_time: Duration = now - prev;
if elapsed_time >= remaining_time {
break;
} else {
remaining_time = remaining_time - elapsed_time;
prev = now;
}
}
Err(Fail::new(libc::ETIMEDOUT, "wait timed out"))
}
pub fn get_runtime(&self) -> SharedDemiRuntime {
self.libos.get_runtime().clone()
}
pub fn get_transport(&self) -> SharedInetStack {
self.libos.get_transport().clone()
}
}
impl Deref for SharedEngine {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SharedEngine {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}