use super::*;
ghost_actor::ghost_chan! {
pub(crate) chan LowLevelWireApi<LairError> {
fn low_level_send(msg: LairWire) -> ();
}
}
pub(crate) type LowLevelWireSender =
futures::channel::mpsc::Sender<LowLevelWireApi>;
pub(crate) type LowLevelWireReceiver =
futures::channel::mpsc::Receiver<LowLevelWireApi>;
pub(crate) fn spawn_low_level_write_half(
kill_switch: KillSwitch,
mut write_half: IpcWrite,
) -> LairResult<LowLevelWireSender> {
let (s, mut r) = futures::channel::mpsc::channel(10);
err_spawn("ll-write", async move {
while let Ok(msg) = kill_switch
.mix(async {
r.next()
.await
.ok_or_else::<LairError, _>(|| "stream end".into())
})
.await
{
match msg {
LowLevelWireApi::LowLevelSend { respond, msg, .. } => {
let res = kill_switch
.mix(async {
let msg_enc = msg.encode()?;
write_half
.write_all(&msg_enc)
.await
.map_err(LairError::other)?;
trace!("ll wrote {:?}", msg);
Ok(())
})
.await;
let should_break = res.is_err();
respond.respond(Ok(async move { res }.boxed().into()));
if should_break {
break;
}
}
}
}
LairResult::<()>::Ok(())
});
Ok(s)
}
pub(crate) fn spawn_low_level_read_half(
kill_switch: KillSwitch,
mut read_half: IpcRead,
) -> LairResult<LowLevelWireReceiver> {
let (s, r) = futures::channel::mpsc::channel(10);
err_spawn("ll-read", async move {
let mut pending_data = Vec::new();
let mut buffer = [0_u8; 4096];
loop {
trace!("ll read tick");
let read = kill_switch
.mix(async {
read_half.read(&mut buffer).await.map_err(LairError::other)
})
.await?;
trace!(?read, "ll read count");
if read == 0 {
trace!("ll read end");
return Err("read returned 0 bytes".into());
}
pending_data.extend_from_slice(&buffer[..read]);
while let Ok(size) = LairWire::peek_size(&pending_data) {
trace!(?size, "ll read peek size");
if pending_data.len() < size {
break;
}
let msg = LairWire::decode(&pending_data)?;
let _ = pending_data.drain(..size);
trace!("ll read {:?}", msg);
let weak_kill_switch = kill_switch.weak();
let task_sender = s.clone();
tokio::task::spawn(async move {
let _ = weak_kill_switch
.mix(task_sender.low_level_send(msg))
.await;
trace!("ll read send done");
});
}
}
});
Ok(r)
}