use super::{
types::*,
connection::SocketEvent,
protocol_model::{ClientMessages, ServerMessages},
};
use std::{
sync::Arc,
time::Duration,
};
use std::future::Future;
use futures::{stream, Stream, StreamExt, SinkExt};
use log::{debug, warn};
use tokio::sync::mpsc::error::TrySendError;
use crate::frontend::socket_server::connection::Peer;
use crate::logic::ogre_robot::types::DisconnectionReason;
pub const SENDER_BUFFER: usize = 8192;
pub const CONCURRENCY: usize = 16;
pub fn sync_futures_stream(_tokio_runtime: Arc<tokio::runtime::Runtime>)
-> (impl Stream<Item = SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>>, // stream of client requests
impl FnMut(SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>) -> bool, // producer of client requests (adds to the stream)
impl FnMut()) {
let (mut tx, rx) = futures::channel::mpsc::channel::<SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>>(SENDER_BUFFER);
let stream = rx;
let mut tx_for_close = tx.clone();
(
stream,
move |incoming| {
let future = tx.feed(incoming);
futures::executor::block_on(future).expect("Could not send Socket Server network event. Did the `Stream` upgraded by `processor::processor` end, for some reason?");
true
},
move || { tx_for_close.close_channel(); },
)
}
pub fn sync_tokio_stream(_tokio_runtime: Arc<tokio::runtime::Runtime>)
-> (impl Stream<Item = SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>>, // stream of client requests
impl Fn(SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>) -> bool, // producer of client requests (adds to the stream)
impl Fn()) {
let (tx, mut rx) = tokio::sync::mpsc::channel::<SocketEvent<ClientMessages, ServerMessages, DisconnectionReason>>(SENDER_BUFFER);
let stream = stream::poll_fn(move |cx| rx.poll_recv(cx));
(
stream,
move |incoming| match tx.try_send(incoming) {
Ok(_) => true,
Err(err) => match err {
TrySendError::Full(_) => false,
TrySendError::Closed(err) => panic!("Could not send Socket Server network event. The `Stream` upgraded by `processor::processor` closed: {:?}", err),
}
},
move || std::thread::sleep(Duration::from_secs(5)),
)
}
pub async fn spawn_stream_executor(stream: impl Stream<Item = (Arc<Peer<ServerMessages, DisconnectionReason>>, bool)> + Send + Sync + 'static) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
debug!("Experimental Stream Executor started!");
stream.for_each(|(peer, send_status)| async move {
if !send_status {
warn!("Experimental Stream Executor faced a bad time sending a response back to {:?} (peer id {}): result: {:?}", peer.peer_address, peer.peer_id, send_status);
}
}).await;
warn!("Experimental Executor ended!");
})
}