use std::fmt::Debug;
use reactive_messaging::prelude::*;
use std::future;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::time::{SystemTime, UNIX_EPOCH};
use futures::Stream;
use futures::StreamExt;
pub fn now_as_micros() -> u64 {
let start_time = SystemTime::now();
start_time.duration_since(UNIX_EPOCH).expect("Time went backwards?!?!").as_micros() as u64
}
pub fn last_micros_probed_protocol_events_handler<const CONFIG: u64,
LocalMessages: ReactiveMessagingConfig<LocalMessages> + Send + Sync + PartialEq + Debug + 'static,
SenderChannel: FullDuplexUniChannel<ItemType=LocalMessages, DerivedItemType=LocalMessages> + Send + Sync,
StateType: Send + Sync + Clone + Debug + 'static>
()
-> (/*probed_protocol_events_handler*/ impl Fn(/*event: */ProtocolEvent<CONFIG, LocalMessages, SenderChannel, StateType>) -> Pin<Box<dyn Future<Output=()> + Send + Sync>> + Send + Sync,
/*last_peer_arrived_notification_micros*/ Arc<AtomicU64>,
/*last_peer_left_notification_micros*/ Arc<AtomicU64>,
/*last_local_service_termination_notification_micros*/ Arc<AtomicU64> ) {
let last_peer_arrived_notification_micros = Arc::new(AtomicU64::new(0));
let last_peer_left_notification_micros = Arc::new(AtomicU64::new(0));
let last_local_service_termination_notification_micros = Arc::new(AtomicU64::new(0));
let last_peer_arrived_notification_micros_ref = Arc::clone(&last_peer_arrived_notification_micros);
let last_peer_left_notification_micros_ref = Arc::clone(&last_peer_left_notification_micros);
let last_local_service_termination_notification_micros_ref = Arc::clone(&last_local_service_termination_notification_micros);
(
move |event| {
match event {
ProtocolEvent::PeerArrived { peer: _ } => last_peer_arrived_notification_micros_ref.store(now_as_micros(), Relaxed),
ProtocolEvent::PeerLeft { peer: _, stream_stats: _ } => last_peer_left_notification_micros_ref.store(now_as_micros(), Relaxed),
ProtocolEvent::LocalServiceTermination => last_local_service_termination_notification_micros_ref.store(now_as_micros(), Relaxed),
}
Box::pin(future::ready(()))
},
last_peer_arrived_notification_micros,
last_peer_left_notification_micros,
last_local_service_termination_notification_micros
)
}
pub fn last_micros_probed_protocol_processor_builder<StreamItemType: Debug,
InputStreamType: Stream<Item=StreamItemType> + Send + 'static>
()
-> (/*probed_protocol_processor_builder*/ impl Fn(/*remote_messages_stream: */InputStreamType) -> Pin<Box<dyn Stream<Item=StreamItemType> + Send>> + Send + Sync,
/*last_remote_message_micros*/ Arc<AtomicU64> ) {
let last_remote_message_micros = Arc::new(AtomicU64::new(0));
let last_remote_message_micros_ref = Arc::clone(&last_remote_message_micros);
(
move |remote_messages_stream| {
let last_remote_message_micros_ref = Arc::clone(&last_remote_message_micros_ref);
Box::pin( remote_messages_stream.inspect(move |_server_message| {
last_remote_message_micros_ref.store(now_as_micros(), Relaxed);
}) )
},
last_remote_message_micros
)
}