use std::fmt;
use tracing::Instrument;
use super::{
ReadLock, RwLock,
msg::{ReadRequest, Value, WriteRequest},
};
use crate::{
RemoteSend, codec, exec,
exec::task::JoinHandle,
rch::{mpsc, watch},
};
pub struct Owner<T, Codec = codec::Default> {
task: Option<JoinHandle<T>>,
rw_lock: RwLock<T, Codec>,
term_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl<T, Codec> fmt::Debug for Owner<T, Codec> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Owner").finish()
}
}
impl<T, Codec> Owner<T, Codec>
where
T: RemoteSend + Clone + Sync,
Codec: codec::Codec,
{
pub fn new(mut value: T) -> Self {
let (read_req_tx, read_req_rx) = mpsc::channel(1);
let read_req_tx = read_req_tx.set_buffer();
let read_req_rx = read_req_rx.set_buffer();
let (write_req_tx, write_req_rx) = mpsc::channel(1);
let write_req_tx = write_req_tx.set_buffer();
let write_req_rx = write_req_rx.set_buffer();
let (term_tx, term_rx) = tokio::sync::oneshot::channel();
let task = exec::spawn(
async move {
tokio::select! {
_ = Self::owner_task(&mut value, read_req_rx, write_req_rx) => (),
_ = term_rx => (),
}
value
}
.in_current_span(),
);
let read_lock = ReadLock::new(read_req_tx);
let rw_lock = RwLock::new(read_lock, write_req_tx);
Self { task: Some(task), rw_lock, term_tx: Some(term_tx) }
}
async fn owner_task(
value: &mut T, mut read_req_rx: mpsc::Receiver<ReadRequest<T, Codec>, Codec, 1>,
mut write_req_rx: mpsc::Receiver<WriteRequest<T, Codec>, Codec, 1>,
) {
let (dropped_tx, dropped_rx) = mpsc::channel(1);
let mut dropped_tx = dropped_tx.set_buffer();
let mut dropped_rx = dropped_rx.set_buffer::<1>();
let (mut invalid_tx, mut invalid_rx) = watch::channel(false);
loop {
tokio::select! {
biased;
res = write_req_rx.recv() => {
let WriteRequest {value_tx, new_value_rx, confirm_tx} = match res {
Ok(Some(req)) => req,
Ok(None) => break,
Err(err) if err.is_final() => break,
Err(_) => continue,
};
let _ = invalid_tx.send(true);
drop(dropped_tx);
loop {
if let Ok(None) = dropped_rx.recv().await {
break;
}
}
let (new_dropped_tx, new_dropped_rx) = mpsc::channel(1);
let new_dropped_tx = new_dropped_tx.set_buffer();
let new_dropped_rx = new_dropped_rx.set_buffer();
dropped_tx = new_dropped_tx;
dropped_rx = new_dropped_rx;
let (new_invalid_tx, new_invalid_rx) = watch::channel(false);
invalid_tx = new_invalid_tx;
invalid_rx = new_invalid_rx;
let _ = value_tx.send(value.clone());
if let Ok(nv) = new_value_rx.await {
*value = nv;
let _ = confirm_tx.send(());
}
},
res = read_req_rx.recv() => {
let ReadRequest {value_tx} = match res {
Ok(Some(req)) => req,
Ok(None) => break,
Err(err) if err.is_final() => break,
Err(_) => continue,
};
let v = Value {
value: value.clone(),
dropped_tx: dropped_tx.clone(),
invalid_rx: invalid_rx.clone(),
};
let _ = value_tx.send(v);
},
}
}
}
pub async fn into_inner(mut self) -> T {
let _ = self.term_tx.take().unwrap().send(());
self.task.take().unwrap().await.unwrap()
}
pub fn rw_lock(&self) -> RwLock<T, Codec> {
self.rw_lock.clone()
}
pub fn read_lock(&self) -> ReadLock<T, Codec> {
self.rw_lock.read_lock()
}
}
impl<T, Codec> Drop for Owner<T, Codec> {
fn drop(&mut self) {
}
}