ax_core 0.3.2

Core library implementing the functions of ax
Documentation
use ax_types::{
    service::{SubscribeRequest, SubscribeResponse},
    AppId,
};
use futures::{
    stream::{self, BoxStream},
    FutureExt, StreamExt,
};
use wsrpc::Service;

use crate::api::events::service::EventService;

pub struct Subscribe {
    event_service: EventService,
}

impl Service for Subscribe {
    type Req = SubscribeRequest;
    type Resp = SubscribeResponse;
    type Error = String;
    type Ctx = AppId;

    fn serve(&self, app_id: AppId, req: Self::Req) -> BoxStream<'static, Result<Self::Resp, Self::Error>> {
        let service = self.event_service.clone();
        async move {
            service
                .subscribe(app_id, req)
                .map(move |x| match x {
                    Ok(stream) => stream.map(Ok).left_stream(),
                    Err(e) => stream::once(futures::future::err(e.to_string())).right_stream(),
                })
                .await
        }
        .flatten_stream()
        .boxed()
    }
}

pub fn service(event_service: EventService) -> Subscribe {
    Subscribe { event_service }
}