use futures_util::FutureExt;
use pyo3::{prelude::*, wrap_pyfunction};
use tungstenite::Message as WsMessage;
use crate::server::{self, consumer_state::{self}};
use consumer_state as cs;
#[pyfunction]
pub fn start_server() -> bool {
if is_server_running() {
consumer_state::weakly_record_error("Server is already running, can't invoke start_server().".to_string());
return false;
}
let server_started = server::start().is_ok();
if !server_started { return false; }
println!("Server started.");
return true;
}
#[pyfunction]
pub fn is_server_running() -> bool {
cs::read(&cs::CS_SER_ALIVE_RX, |rx| {
println!("Returning server alive: {}", *rx.borrow());
*rx.borrow()
}).unwrap_or_else(|| {
println!("Failed to get server alive!");
false
})
}
#[pyfunction]
pub fn shutdown_server() {
let res = cs::mutate(&cs::CS_SER_REQ_SHUTDOWN_TX, |tx| tx.send(true));
if res.is_none() {
println!("[api.rs] Warning! Failed to send shutdown request.");
}
}
#[pyfunction]
pub fn get_last_error_string() -> Option<String> {
consumer_state::try_get_last_error()
}
#[pyfunction]
pub fn drain_new_client_events(py: Python) -> Vec<String> {
py.allow_threads(|| {
let drained_new_cli_evts = cs::mutate(&cs::CS_CLI_CONN_RX, |rx| {
let mut new_cli_evts = vec![];
while let Some(Some(new_cli)) = rx.recv().now_or_never() {
new_cli_evts.push(new_cli);
}
new_cli_evts
});
if drained_new_cli_evts.is_none() {
return vec![]
}
let drained_new_cli_evts = drained_new_cli_evts.unwrap();
drained_new_cli_evts
})
}
#[derive(FromPyObject)]
pub enum MessagePayload {
#[pyo3(transparent, annotation = "str")]
Text(String),
#[pyo3(transparent, annotation = "bytes")]
Binary(Vec<u8>)
}
impl IntoPy<PyObject> for MessagePayload {
fn into_py(self, py: Python) -> PyObject {
match self {
MessagePayload::Text(text) => {
pyo3::types::PyUnicode::new(py, text.as_str()).into()
}
MessagePayload::Binary(bytes) => {
pyo3::types::PyBytes::new(py, &bytes).into()
}
}
}
}
#[pyfunction]
pub fn try_send_messages(py: Python, messages: Vec<MessagePayload>) -> PyResult<()> {
py.allow_threads(|| {
let messages: Vec<WsMessage> = messages.into_iter().map(|msg| { match msg {
MessagePayload::Text(text) => { WsMessage::Text(text) }
MessagePayload::Binary(bytes) => { WsMessage::Binary(bytes) }
}}).collect();
let send_res = cs::read(&cs::CS_SER_MSG_TX, |tx| {
tx.send(messages)
});
if send_res.is_none() || send_res.as_ref().unwrap().is_err() {
let details = "Error reading server state for transmitter".to_string();
if send_res.is_none() {
return Err(pyo3::exceptions::PyBaseException::new_err(format!("Failed to send message. Details: {}", details)));
}
}
Ok(())
})
}
#[pyfunction]
pub fn drain_client_messages(py: Python) -> Vec<MessagePayload> {
py.allow_threads(|| {
let drained_messages = cs::mutate(&cs::CS_CLI_MSG_RX, |rx| {
let mut messages = vec![];
while let Some(Some(cli_msg)) = rx.recv().now_or_never() {
let converted_msg = match cli_msg {
WsMessage::Text(text) => { Some(MessagePayload::Text(text)) }
WsMessage::Binary(bytes) => { Some(MessagePayload::Binary(bytes)) }
WsMessage::Ping(_) => { None }
WsMessage::Pong(_) => { None }
WsMessage::Close(_) => { None }
};
if converted_msg.is_some() { messages.push(converted_msg.unwrap()); }
}
messages
});
if drained_messages.is_none() {
return vec![];
}
let drained_messages = drained_messages.unwrap();
drained_messages
})
}
#[pymodule]
fn quicksocket(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(start_server, m)?)?;
m.add_function(wrap_pyfunction!(is_server_running, m)?)?;
m.add_function(wrap_pyfunction!(shutdown_server, m)?)?;
m.add_function(wrap_pyfunction!(get_last_error_string, m)?)?;
m.add_function(wrap_pyfunction!(drain_new_client_events, m)?)?;
m.add_function(wrap_pyfunction!(try_send_messages, m)?)?;
m.add_function(wrap_pyfunction!(drain_client_messages, m)?)?;
Ok(())
}