use std::any::TypeId;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use dashmap::DashMap;
use dupe::Dupe;
use crate::arc_erase::ArcEraseDyn;
use crate::storage::data::DataKey;
use crate::storage::data::PagableData;
use crate::storage::support::SerializerForPaging;
use crate::storage::traits::DeserializedArcCache;
use crate::storage::traits::PagableStorage;
use crate::traits::SessionContext;
pub struct InMemoryPagableStorage {
pending: InMemoryPagableStoragePendingPageOut,
handle: Arc<InMemoryPagableStorageHandle>,
session_context: SessionContext,
}
struct InMemoryPagableStoragePendingPageOut {
pending_messages: std::sync::mpsc::Receiver<Box<dyn ArcEraseDyn>>,
pending: Vec<Box<dyn ArcEraseDyn>>,
}
struct InMemoryPagableStorageCache {
data: DashMap<DataKey, Arc<PagableData>>,
arcs: DeserializedArcCache,
is_dropped: AtomicBool,
}
impl InMemoryPagableStorageCache {
fn new() -> Self {
Self {
data: DashMap::new(),
arcs: DeserializedArcCache::new(),
is_dropped: AtomicBool::new(false),
}
}
fn dropped(&self) {
self.is_dropped.store(true, Ordering::Release);
self.data.clear();
self.arcs.clear();
}
fn insert_data(&self, key: DataKey, data: Arc<PagableData>) {
self.data.insert(key, data);
if self.is_dropped.load(Ordering::Acquire) {
self.data.clear();
}
}
fn get_data(&self, key: &DataKey) -> Option<Arc<PagableData>> {
self.data.get(key).map(|v| v.dupe())
}
}
struct InMemoryPagableStorageHandle {
sender: std::sync::mpsc::Sender<Box<dyn ArcEraseDyn>>,
cache: InMemoryPagableStorageCache,
session_context: SessionContext,
}
impl InMemoryPagableStorage {
pub fn new() -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
Self {
handle: Arc::new(InMemoryPagableStorageHandle {
sender,
cache: InMemoryPagableStorageCache::new(),
session_context: SessionContext::new(),
}),
pending: InMemoryPagableStoragePendingPageOut {
pending_messages: receiver,
pending: Vec::new(),
},
session_context: SessionContext::new(),
}
}
pub fn handle(&self) -> Arc<dyn PagableStorage> {
self.handle.dupe()
}
pub fn session_context(&self) -> &SessionContext {
&self.session_context
}
pub fn pending_paging_count(&mut self) -> usize {
while let Ok(v) = self.pending.pending_messages.try_recv() {
self.pending.pending.push(v);
}
self.pending.pending.len()
}
pub fn page_out_pending(&mut self) {
loop {
while let Ok(v) = self.pending.pending_messages.try_recv() {
self.pending.pending.push(v);
}
if let Some(v) = self.pending.pending.pop() {
if !v.needs_paging_out() {
continue;
}
let mut finished: HashMap<usize, DataKey> = HashMap::new();
enum Task {
Start(Box<dyn ArcEraseDyn>),
Finish((Box<dyn ArcEraseDyn>, Vec<u8>, Vec<Box<dyn ArcEraseDyn>>)),
}
let mut tasks = vec![Task::Start(v)];
while let Some(task) = tasks.pop() {
match task {
Task::Start(v) => {
if finished.contains_key(&v.identity()) {
continue;
}
let mut serializer = SerializerForPaging::new(&self.session_context);
v.serialize(&mut serializer).unwrap();
let (data, arcs) = serializer.finish();
let mut subtasks = vec![];
for arc in arcs.iter() {
if finished.contains_key(&arc.identity()) {
continue;
}
subtasks.push(Task::Start(arc.clone_dyn()));
}
tasks.push(Task::Finish((v, data, arcs)));
tasks.extend(subtasks);
}
Task::Finish((arc, data, serialized_arcs)) => {
let mut arcs: Vec<DataKey> = Vec::with_capacity(serialized_arcs.len());
for arc in serialized_arcs {
let key = *finished
.get(&arc.identity())
.expect("nested arc should have been serialized first");
arcs.push(key);
}
let data = PagableData { data, arcs };
let key = data.compute_key();
self.handle.cache.insert_data(key, Arc::new(data));
finished.insert(arc.identity(), key);
arc.set_data_key(key);
}
}
}
} else {
break;
}
}
}
}
impl Drop for InMemoryPagableStorage {
fn drop(&mut self) {
self.handle.cache.dropped()
}
}
impl Default for InMemoryPagableStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl PagableStorage for InMemoryPagableStorageHandle {
fn arc_cache(&self) -> &DeserializedArcCache {
&self.cache.arcs
}
fn fetch_data_blocking(&self, key: &DataKey) -> anyhow::Result<Arc<PagableData>> {
self.cache
.get_data(key)
.ok_or_else(|| anyhow::anyhow!("no data for {:?}", key))
}
async fn fetch_data(&self, key: &DataKey) -> anyhow::Result<Arc<PagableData>> {
self.cache
.get_data(key)
.ok_or_else(|| anyhow::anyhow!("no data for key {:?}", key))
}
fn on_arc_deserialized(
&self,
typeid: TypeId,
key: DataKey,
arc: Box<dyn ArcEraseDyn>,
) -> Option<Box<dyn ArcEraseDyn>> {
if self.cache.is_dropped.load(Ordering::Acquire) {
Some(arc)
} else {
self.cache.arcs.on_arc_deserialized(typeid, key, arc)
}
}
fn schedule_for_paging(&self, arc: Box<dyn ArcEraseDyn>) {
drop(self.sender.send(arc));
}
fn session_context(&self) -> &SessionContext {
&self.session_context
}
fn store_data(&self, data: PagableData) -> anyhow::Result<DataKey> {
let key = data.compute_key();
self.cache.insert_data(key, Arc::new(data));
Ok(key)
}
}