use std::{net::SocketAddr, pin::Pin, task::Poll};
use rand::{RngCore, SeedableRng};
use rhai::{Dynamic, Engine, FnPtr, NativeCallContext};
use tokio::io::AsyncRead;
use tracing::{debug, debug_span, Instrument};
use crate::scenario_executor::{
scenario::{callback_and_continue, ScenarioAccess},
types::{Handle, StreamRead, StreamSocket, StreamWrite},
utils1::TaskHandleExt2,
};
use super::{types::Task, utils1::RhResult};
fn stdio_socket() -> Handle<StreamSocket> {
StreamSocket {
read: Some(StreamRead {
reader: Box::pin(tokio::io::stdin()),
prefix: Default::default(),
}),
write: Some(StreamWrite {
writer: Box::pin(tokio::io::stdout()),
}),
close: None,
fd: None,
}
.wrap()
}
fn lookup_host(
ctx: NativeCallContext,
addr: String,
continuation: FnPtr,
) -> RhResult<Handle<Task>> {
let original_span = tracing::Span::current();
let span = debug_span!(parent: original_span, "resolve");
let the_scenario = ctx.get_scenario()?;
debug!(parent: &span, "node created");
Ok(async move {
debug!("node started");
let ips: Vec<SocketAddr> = tokio::net::lookup_host(addr).await?.collect();
callback_and_continue::<(Vec<SocketAddr>,)>(the_scenario, continuation, (ips,)).await;
Ok(())
}
.instrument(span)
.wrap())
}
struct RandomReader<R>(R);
impl<R: RngCore + Unpin> AsyncRead for RandomReader<R> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = self.get_mut();
let b = buf.initialize_unfilled();
this.0.fill_bytes(b);
let n = b.len();
buf.advance(n);
return Poll::Ready(Ok(()))
}
}
fn random_socket(
ctx: NativeCallContext,
opts: Dynamic,
) -> RhResult<Handle<StreamSocket>> {
let the_scenario = ctx.get_scenario()?;
#[derive(serde::Deserialize)]
struct Opts {
#[serde(default)]
fast: bool,
}
let opts: Opts = rhai::serde::from_dynamic(&opts)?;
debug!("random_socket: options parsed");
let r : Pin<Box<dyn AsyncRead + Send + 'static>> = if !opts.fast {
let rng = rand_chacha::ChaCha12Rng::from_rng(&mut the_scenario.prng.lock().unwrap());
Box::pin(RandomReader(rng))
} else {
let rng = rand_pcg::Pcg64::from_rng(&mut the_scenario.prng.lock().unwrap());
Box::pin(RandomReader(rng))
};
let w = Box::pin(tokio::io::empty());
let s = StreamSocket {
read: Some(StreamRead {
reader: r,
prefix: Default::default(),
}),
write: Some(StreamWrite { writer: w }),
close: None,
fd: None,
};
let h = s.wrap();
Ok(h)
}
pub fn register(engine: &mut Engine) {
engine.register_fn("stdio_socket", stdio_socket);
engine.register_fn("lookup_host", lookup_host);
engine.register_fn("random_socket", random_socket);
}