use std::sync::Arc;
use std::time::Duration;
use derive_builder::Builder;
use parking_lot::Mutex;
use tarantool::fiber;
use crate::error::Error;
use crate::error::Result;
use super::TransportReceiver;
use super::{Transport, TransportSender};
#[derive(Builder, Debug)]
pub struct PollTransport {
#[builder(default = "default_poll_interval()")]
interval: Duration,
}
impl Default for PollTransport {
fn default() -> Self {
Self {
interval: default_poll_interval(),
}
}
}
fn default_poll_interval() -> Duration {
Duration::from_millis(10)
}
impl Transport for PollTransport {
type Sender<T> = Sender<T> where T: Send;
type Receiver<T> = Receiver<T>
where
T: Send;
fn create_channel<T: Send>(&self) -> (Self::Sender<T>, Self::Receiver<T>) {
let result: SharedResult<T> = Default::default();
(
Sender(result.clone()),
Receiver {
result,
interval: self.interval,
},
)
}
}
pub struct Sender<T>(SharedResult<T>);
impl<T: Send> TransportSender<T> for Sender<T> {
fn send(self, data: T) -> Result<()> {
*self.0.lock() = Some(data);
Ok(())
}
}
pub struct Receiver<T> {
result: SharedResult<T>,
interval: Duration,
}
impl<T: Send> TransportReceiver<T> for Receiver<T> {
fn receive(self) -> Result<T> {
loop {
{
let mut result = self.result.lock();
if let Some(result) = result.take() {
return Ok(result);
}
if Arc::strong_count(&self.result) != 2 {
return Err(Error::sender_disconnected());
}
}
fiber::sleep(self.interval)
}
}
}
type SharedResult<T> = Arc<Mutex<Option<T>>>;