#[cfg(test)]
use crate::runtime::network::types::MacAddress;
use crate::{
demi_sgarray_t,
demikernel::config::Config,
inetstack::protocols::layer4::{
Peer,
Socket,
},
runtime::{
fail::Fail,
memory::{
DemiBuffer,
MemoryRuntime,
},
network::{
socket::option::SocketOption,
transport::NetworkTransport,
},
poll_yield,
SharedDemiRuntime,
SharedObject,
},
};
use ::socket2::{
Domain,
Type,
};
#[cfg(test)]
use ::std::{
collections::HashMap,
hash::RandomState,
net::Ipv4Addr,
time::Duration,
};
use protocols::{
layer1::PhysicalLayer,
layer2::SharedLayer2Endpoint,
layer3::SharedLayer3Endpoint,
};
use ::futures::FutureExt;
use ::std::{
fmt::Debug,
net::{
SocketAddr,
SocketAddrV4,
},
ops::{
Deref,
DerefMut,
},
};
use crate::timer;
#[cfg(test)]
pub mod test_helpers;
pub mod options;
pub mod protocols;
const MAX_RECV_ITERS: usize = 2;
pub struct InetStack {
runtime: SharedDemiRuntime,
layer4_endpoint: Peer,
}
#[derive(Clone)]
pub struct SharedInetStack(SharedObject<InetStack>);
impl SharedInetStack {
pub fn new<P: PhysicalLayer>(
config: &Config,
runtime: SharedDemiRuntime,
layer1_endpoint: P,
) -> Result<Self, Fail> {
SharedInetStack::new_test(config, runtime, layer1_endpoint)
}
pub fn new_test<P: PhysicalLayer>(
config: &Config,
mut runtime: SharedDemiRuntime,
layer1_endpoint: P,
) -> Result<Self, Fail> {
let rng_seed: [u8; 32] = [0; 32];
let layer2_endpoint: SharedLayer2Endpoint = SharedLayer2Endpoint::new(config, layer1_endpoint)?;
let layer3_endpoint: SharedLayer3Endpoint =
SharedLayer3Endpoint::new(config, runtime.clone(), layer2_endpoint, rng_seed)?;
let layer4_endpoint: Peer = Peer::new(config, runtime.clone(), layer3_endpoint, rng_seed)?;
let me: Self = Self(SharedObject::<InetStack>::new(InetStack {
runtime: runtime.clone(),
layer4_endpoint,
}));
runtime.insert_background_coroutine("bgc::inetstack::poll_recv", Box::pin(me.clone().poll().fuse()))?;
Ok(me)
}
pub async fn poll(mut self) {
timer!("inetstack::poll");
loop {
for _ in 0..MAX_RECV_ITERS {
self.layer4_endpoint.poll_once();
}
poll_yield().await;
}
}
#[cfg(test)]
pub async fn ping(&mut self, addr: Ipv4Addr, timeout: Option<Duration>) -> Result<Duration, Fail> {
self.layer4_endpoint.ping(addr, timeout).await
}
#[cfg(test)]
pub async fn arp_query(&mut self, addr: Ipv4Addr) -> Result<MacAddress, Fail> {
self.layer4_endpoint.arp_query(addr).await
}
#[cfg(test)]
pub fn export_arp_cache(&self) -> HashMap<Ipv4Addr, MacAddress, RandomState> {
self.layer4_endpoint.export_arp_cache()
}
}
impl Deref for SharedInetStack {
type Target = InetStack;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl DerefMut for SharedInetStack {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
impl NetworkTransport for SharedInetStack {
type SocketDescriptor = Socket;
fn socket(&mut self, domain: Domain, typ: Type) -> Result<Self::SocketDescriptor, Fail> {
self.layer4_endpoint.socket(domain, typ)
}
fn set_socket_option(&mut self, sd: &mut Self::SocketDescriptor, option: SocketOption) -> Result<(), Fail> {
self.layer4_endpoint.set_socket_option(sd, option)
}
fn get_socket_option(
&mut self,
sd: &mut Self::SocketDescriptor,
option: SocketOption,
) -> Result<SocketOption, Fail> {
self.layer4_endpoint.get_socket_option(sd, option)
}
fn getpeername(&mut self, sd: &mut Self::SocketDescriptor) -> Result<SocketAddrV4, Fail> {
self.layer4_endpoint.getpeername(sd)
}
fn bind(&mut self, sd: &mut Self::SocketDescriptor, local: SocketAddr) -> Result<(), Fail> {
self.layer4_endpoint.bind(sd, local)
}
fn listen(&mut self, sd: &mut Self::SocketDescriptor, backlog: usize) -> Result<(), Fail> {
self.layer4_endpoint.listen(sd, backlog)
}
async fn accept(&mut self, sd: &mut Self::SocketDescriptor) -> Result<(Self::SocketDescriptor, SocketAddr), Fail> {
self.layer4_endpoint.accept(sd).await
}
async fn connect(&mut self, sd: &mut Self::SocketDescriptor, remote: SocketAddr) -> Result<(), Fail> {
self.layer4_endpoint.connect(sd, remote).await
}
async fn close(&mut self, sd: &mut Self::SocketDescriptor) -> Result<(), Fail> {
self.layer4_endpoint.close(sd).await
}
fn hard_close(&mut self, sd: &mut Self::SocketDescriptor) -> Result<(), Fail> {
self.layer4_endpoint.hard_close(sd)
}
async fn push(
&mut self,
sd: &mut Self::SocketDescriptor,
buf: &mut DemiBuffer,
addr: Option<SocketAddr>,
) -> Result<(), Fail> {
timer!("inetstack::push");
self.layer4_endpoint.push(sd, buf, addr).await
}
async fn pop(
&mut self,
sd: &mut Self::SocketDescriptor,
size: usize,
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
self.layer4_endpoint.pop(sd, size).await
}
fn get_runtime(&self) -> &SharedDemiRuntime {
&self.runtime
}
}
impl MemoryRuntime for SharedInetStack {
fn clone_sgarray(&self, sga: &demi_sgarray_t) -> Result<DemiBuffer, Fail> {
self.layer4_endpoint.clone_sgarray(sga)
}
fn into_sgarray(&self, buf: DemiBuffer) -> Result<demi_sgarray_t, Fail> {
self.layer4_endpoint.into_sgarray(buf)
}
fn sgaalloc(&self, size: usize) -> Result<demi_sgarray_t, Fail> {
self.layer4_endpoint.sgaalloc(size)
}
fn sgafree(&self, sga: demi_sgarray_t) -> Result<(), Fail> {
self.layer4_endpoint.sgafree(sga)
}
}
impl Debug for Socket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Socket::Tcp(socket) => socket.fmt(f),
Socket::Udp(socket) => socket.fmt(f),
}
}
}