use std::pin::pin;
use std::sync::Arc;
use futures::future::BoxFuture;
use perspective_server::{Server, ServerResult};
use pollster::FutureExt;
use pyo3::IntoPyObjectExt;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyAny;
use super::PyAsyncSession;
use super::session_async::PyConnection;
use crate::py_async::AllowThreads;
#[pyclass(subclass, module = "perspective")]
#[derive(Clone)]
pub struct AsyncServer {
pub server: Server,
}
#[pymethods]
impl AsyncServer {
#[new]
#[pyo3(signature = (on_poll_request=None))]
pub fn new(on_poll_request: Option<Py<PyAny>>) -> Self {
Self {
server: Server::new(on_poll_request.map(|f| {
let f = Arc::new(f);
Arc::new(move |server: &Server| {
let f = f.clone();
let server = server.clone();
Box::pin(async move {
Python::with_gil(|py| {
f.call1(py, (AsyncServer { server }.into_py_any(py).unwrap(),))
})?;
Ok(())
}) as BoxFuture<'static, ServerResult<()>>
})
as Arc<dyn Fn(&Server) -> BoxFuture<'static, ServerResult<()>> + Send + Sync>
})),
}
}
pub fn new_session(&self, _py: Python, response_cb: Py<PyAny>) -> PyAsyncSession {
let session = self
.server
.new_session(PyConnection(response_cb.into()))
.block_on();
let session = Arc::new(async_lock::RwLock::new(Some(session)));
PyAsyncSession { session }
}
pub async fn poll(&self) -> PyResult<()> {
AllowThreads(pin!(async move {
self.server
.poll()
.await
.map_err(|e| PyValueError::new_err(format!("{e}")))
}))
.await
}
}