use std::error::Error as StdError;
use std::sync::Arc;
use futures::Future;
use prost::Message;
use crate::proto::request::ClientReq;
use crate::proto::{Request, Response};
use crate::{Client, ClientError, asyncfn};
#[cfg(doc)]
use crate::{Table, View};
pub trait Session<E> {
fn handle_request(&self, request: &[u8]) -> impl Future<Output = Result<(), E>>;
fn close(self) -> impl Future<Output = ()>;
}
type ProxyCallbackError = Box<dyn StdError + Send + Sync>;
type ProxyCallback = Arc<dyn Fn(&[u8]) -> Result<(), ProxyCallbackError> + Send + Sync>;
#[derive(Clone)]
pub struct ProxySession {
parent: Client,
callback: ProxyCallback,
}
impl ProxySession {
pub fn new(
client: Client,
send_response: impl Fn(&[u8]) -> Result<(), ProxyCallbackError> + Send + Sync + 'static,
) -> Self {
ProxySession {
parent: client,
callback: Arc::new(send_response),
}
}
}
fn encode(response: Response, callback: ProxyCallback) -> Result<(), ClientError> {
let mut enc = vec![];
response.encode(&mut enc)?;
callback(&enc).map_err(|x| ClientError::Unknown(x.to_string()))?;
Ok(())
}
impl Session<ClientError> for ProxySession {
async fn handle_request(&self, request: &[u8]) -> Result<(), ClientError> {
let req = Request::decode(request)?;
let callback = self.callback.clone();
match req.client_req.as_ref() {
Some(ClientReq::ViewOnUpdateReq(_)) => {
let on_update =
asyncfn!(callback, async move |response| encode(response, callback));
self.parent.subscribe(&req, on_update).await?
},
Some(_) => {
let on_update = move |response| encode(response, callback);
self.parent
.subscribe_once(&req, Box::new(on_update))
.await?
},
None => {
return Err(ClientError::Internal(
"ProxySession::handle_request: invalid request".to_string(),
));
},
};
Ok(())
}
async fn close(self) {}
}