use crate::chip::Chip;
use crate::line::{EdgeEvent, InfoChangeEvent};
use crate::request::{EdgeEventBuffer, Request};
use crate::Result;
use futures::ready;
use futures::task::{Context, Poll};
use std::pin::Pin;
use tokio::io::unix::AsyncFd;
use tokio_stream::Stream;
#[derive(Debug)]
pub struct AsyncChip(AsyncFd<Chip>);
impl AsyncChip {
pub fn new(chip: Chip) -> Self {
AsyncChip(AsyncFd::new(chip).unwrap())
}
pub async fn read_line_info_change_event(&self) -> Result<InfoChangeEvent> {
loop {
let mut guard = self.0.readable().await?;
let chip = self.0.get_ref();
if chip.has_line_info_change_event()? {
let res = chip.read_line_info_change_event();
if !chip.has_line_info_change_event()? {
guard.clear_ready();
}
return res;
}
}
}
pub fn info_change_events(&self) -> InfoChangeStream<'_> {
InfoChangeStream { chip: self }
}
}
impl AsRef<Chip> for AsyncChip {
fn as_ref(&self) -> &Chip {
self.0.get_ref()
}
}
impl From<AsyncChip> for Chip {
fn from(c: AsyncChip) -> Chip {
c.0.into_inner()
}
}
impl From<Chip> for AsyncChip {
fn from(c: Chip) -> AsyncChip {
AsyncChip::new(c)
}
}
pub struct InfoChangeStream<'a> {
chip: &'a AsyncChip,
}
impl Stream for InfoChangeStream<'_> {
type Item = Result<InfoChangeEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut guard = ready!(self.chip.0.poll_read_ready(cx))?;
let res = Poll::Ready(Some(self.chip.as_ref().read_line_info_change_event()));
if !self.chip.as_ref().has_line_info_change_event()? {
guard.clear_ready();
}
res
}
}
#[derive(Debug)]
pub struct AsyncRequest(AsyncFd<Request>);
impl AsyncRequest {
pub fn new(req: Request) -> Self {
AsyncRequest(AsyncFd::new(req).unwrap())
}
pub async fn read_edge_event(&self) -> Result<EdgeEvent> {
loop {
let mut guard = self.0.readable().await?;
let req = self.0.get_ref();
if req.has_edge_event()? {
let res = req.read_edge_event();
if !req.has_edge_event()? {
guard.clear_ready();
}
return res;
}
}
}
pub async fn read_edge_events_into_slice(&self, buf: &mut [u64]) -> Result<usize> {
loop {
let mut guard = self.0.readable().await?;
let req = self.0.get_ref();
if req.has_edge_event()? {
let res = req.read_edge_events_into_slice(buf);
if !req.has_edge_event()? {
guard.clear_ready();
}
return res;
}
}
}
pub fn new_edge_event_stream(&self, capacity: usize) -> EdgeEventStream<'_> {
EdgeEventStream {
req: self,
events: self.0.get_ref().new_edge_event_buffer(capacity),
}
}
pub fn edge_events(&self) -> EdgeEventStream<'_> {
EdgeEventStream {
req: self,
events: self.0.get_ref().edge_events(),
}
}
}
impl AsRef<Request> for AsyncRequest {
fn as_ref(&self) -> &Request {
self.0.get_ref()
}
}
impl From<AsyncRequest> for Request {
fn from(r: AsyncRequest) -> Request {
r.0.into_inner()
}
}
impl From<Request> for AsyncRequest {
fn from(r: Request) -> AsyncRequest {
AsyncRequest::new(r)
}
}
pub struct EdgeEventStream<'a> {
req: &'a AsyncRequest,
events: EdgeEventBuffer<'a>,
}
impl Stream for EdgeEventStream<'_> {
type Item = Result<EdgeEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.events.is_empty() {
return Poll::Ready(Some(self.events.read_event()));
}
let mut guard = ready!(self.req.0.poll_read_ready(cx))?;
let res = Poll::Ready(Some(self.events.read_event()));
if !self.req.0.get_ref().has_edge_event()? {
guard.clear_ready();
}
res
}
}