use futures::{channel::mpsc, Stream, StreamExt};
use tower::{Service, ServiceExt};
use crate::{Action, ActionService, Frame, FrameFuture, FrameOutbox};
pub type ActiveAction = Action<super::Active>;
impl ActiveAction {
pub fn active(service: ActionService) -> Self {
Self {
action_state: service.into(),
}
}
pub fn into_inner(self) -> ActionService {
self.action_state.0
}
pub fn handle_frame(&mut self, frame: Frame) -> FrameFuture {
FrameFuture::new(self.action_state.0.call(frame))
}
pub fn into_stream(self) -> (impl Stream<Item = crate::Result<Frame>>, FrameOutbox) {
let (tx, rx) = mpsc::channel(16);
let mut stream = self.call_all(rx);
let handle = FrameOutbox::new(tx);
let stream = async_stream::stream! {
while let Some(frame) = stream.next().await {
yield Ok(frame.map_err(crate::errors::StreamError)?);
}
};
(stream, handle)
}
}
impl From<ActionService> for ActiveAction {
fn from(action_service: ActionService) -> Self {
Self::active(action_service)
}
}
impl<IntoFrame> Service<IntoFrame> for ActiveAction
where
IntoFrame: Into<Frame> + Clone + Send + 'static,
{
type Response = Frame;
type Error = crate::Error;
type Future = FrameFuture;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.action_state.0.poll_ready(cx)
}
fn call(&mut self, frame: IntoFrame) -> Self::Future {
self.handle_frame(frame.into())
}
}