use shared_mutex::{SharedMutex, SharedMutexReadGuard, SharedMutexWriteGuard,
MappedSharedMutexReadGuard,
MappedSharedMutexWriteGuard};
use shared_mutex::monitor::{Monitor, MonitorWriteGuard};
use std::mem;
use sparse::Index;
use {ContentId, Version};
#[derive(Debug)]
pub enum Chunk<'id> {
Immutable(ContentId),
Mutable(MutableChunk<'id>)
}
impl<'id> Chunk<'id> {
pub fn freeze<'chunk>(this: &'chunk Monitor<Self>) -> Result<FreezeGuard<'chunk, 'id>, ContentId> {
loop {
{ let read = this.read().unwrap();
match **read {
Chunk::Immutable(id) => return Err(id),
Chunk::Mutable(ref m) => m.wait_for_freeze(),
};
}
let mut write = this.write().unwrap();
let sentinel = Chunk::Immutable(ContentId::null());
let current = mem::replace(&mut **write, sentinel);
match current {
Chunk::Immutable(id) => {
**write = Chunk::Immutable(id);
return Err(id)
},
Chunk::Mutable(m) => {
let m_state: SharedMutex<_> = m.state.into();
match m_state.into_inner().unwrap() {
MutableChunkState::Stable(index, id) => {
**write = Chunk::Immutable(id);
return Ok(FreezeGuard {
_guard: write,
index: Some(index),
id: id
})
},
state => {
**write = Chunk::Mutable(MutableChunk {
version: m.version,
state: Monitor::new(state)
});
}
}
}
}
}
}
pub fn version(&self) -> Option<usize> {
match *self {
Chunk::Mutable(ref m) => Some(m.version.load()),
_ => None
}
}
}
pub struct FreezeGuard<'chunk, 'id: 'chunk> {
_guard: MonitorWriteGuard<'chunk, Chunk<'id>>,
index: Option<Index<'id>>,
id: ContentId
}
impl<'chunk, 'id> FreezeGuard<'chunk, 'id> {
pub fn take_index(&mut self) -> Index<'id> {
self.index.take().unwrap()
}
pub fn id(&self) -> ContentId { self.id }
}
#[derive(Debug)]
pub struct MutableChunk<'id> {
version: Version,
state: Monitor<MutableChunkState<'id>>
}
impl<'id> MutableChunk<'id> {
pub fn new(id: ContentId) -> Self {
MutableChunk {
version: Version::new(0),
state: Monitor::new(MutableChunkState::Reserved(id))
}
}
pub fn dirty(index: Index<'id>) -> Self {
MutableChunk {
version: Version::new(0),
state: Monitor::new(MutableChunkState::Dirty(index))
}
}
pub fn version(&self) -> &Version { &self.version }
pub fn fill(&self, index: Index<'id>) -> usize {
let mut lock = self.state.write().unwrap();
**lock = match **lock {
MutableChunkState::Reserved(_) => MutableChunkState::Dirty(index),
ref state @ _ => panic!("Logic error! Wrong state on mutable chunk fill: {:?}", state)
};
self.state.notify_all();
self.version.increment() + 1
}
pub fn wait_for_write(&self) -> MappedSharedMutexWriteGuard<Index<'id>> {
let mut lock: SharedMutexWriteGuard<_> = self.state.write().unwrap().into();
loop {
match lock.into_mapped().result_map(|state| {
match *state {
MutableChunkState::Dirty(ref mut index) |
MutableChunkState::Stable(ref mut index, _) => Ok(index),
MutableChunkState::Reserved(_) => Err(())
}
}) {
Ok(lock) => return lock,
Err((l, ())) =>
lock = l.recover(&self.state.as_ref()).unwrap()
.wait_for_write(self.state.cond()).unwrap()
}
}
}
pub fn complete_write(&self, m: MappedSharedMutexWriteGuard<Index<'id>>) -> usize {
let mut lock = m.recover(&self.state.as_ref()).unwrap();
let sentinel = MutableChunkState::Reserved(ContentId::null());
let state = mem::replace(&mut *lock, sentinel);
*lock = match state {
state @ MutableChunkState::Dirty(_) => state,
MutableChunkState::Stable(index, _) => MutableChunkState::Dirty(index),
MutableChunkState::Reserved(_) =>
panic!("Logic error! Reserved state observed during complete_write!")
};
self.version().increment() + 1
}
pub fn wait_for_read(&self) -> MappedSharedMutexReadGuard<Index<'id>> {
let mut lock: SharedMutexReadGuard<_> = self.state.read().unwrap().into();
loop {
match lock.into_mapped().result_map(|state| {
match *state {
MutableChunkState::Dirty(ref index) |
MutableChunkState::Stable(ref index, _) => Ok(index),
MutableChunkState::Reserved(_) => Err(())
}
}) {
Ok(lock) => return lock,
Err((l, ())) =>
lock = l.recover(&self.state.as_ref()).unwrap()
.wait_for_read(self.state.cond()).unwrap()
}
}
}
pub fn wait_for_freeze(&self) -> MappedSharedMutexReadGuard<MutableChunkState<'id>> {
let mut lock: SharedMutexReadGuard<_> = self.state.read().unwrap().into();
loop {
match lock.into_mapped().result_map(|state| {
match *state {
MutableChunkState::Dirty(_) => Err(()),
MutableChunkState::Reserved(_) => Err(()),
ref state @ MutableChunkState::Stable(..) => Ok(state)
}
}) {
Ok(lock) => return lock,
Err((l, _)) =>
lock = l.recover(&self.state.as_ref()).unwrap()
.wait_for_read(self.state.cond()).unwrap()
}
}
}
pub fn complete_flush(&self, id: ContentId, version: Version) -> ::Result<()> {
if self.version.load() != version.load() { return Ok(()) }
let mut lock = self.state.write().unwrap();
if self.version.load() != version.load() { return Ok(()) }
let sentinel = MutableChunkState::Reserved(ContentId::null());
let state = mem::replace(&mut **lock, sentinel);
**lock = match state {
MutableChunkState::Dirty(index) => MutableChunkState::Stable(index, id),
state => state,
};
lock.notify_all();
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub enum MutableChunkState<'id> {
Reserved(ContentId),
Dirty(Index<'id>),
Stable(Index<'id>, ContentId)
}
#[derive(Debug)]
pub struct ImmutableChunk<'id> {
state: Monitor<ImmutableChunkState<'id>>
}
impl<'id> ImmutableChunk<'id> {
pub fn new() -> ImmutableChunk<'id> {
ImmutableChunk {
state: Monitor::new(ImmutableChunkState::Reserved)
}
}
pub fn from_mutable(index: Index<'id>) -> ImmutableChunk<'id> {
ImmutableChunk {
state: Monitor::new(ImmutableChunkState::Stable(index))
}
}
pub fn complete_fill(&self, index: Index<'id>) {
use self::ImmutableChunkState::*;
let mut lock = self.state.write().unwrap();
match **lock {
Reserved => **lock = Stable(index),
Stable(_) => panic!("Logic error - complete_fill called on Stable immutable chunk!"),
Evicted => panic!("Logic error - complete_fill called on Evicted immutable chunk!")
};
self.state.notify_all();
}
pub fn wait_for_read(&self) -> Option<MappedSharedMutexReadGuard<Index<'id>>> {
let mut lock: SharedMutexReadGuard<_> = self.state.read().unwrap().into();
loop {
match lock.into_mapped().result_map(|state| {
match *state {
ImmutableChunkState::Stable(ref index) => Ok(index),
ImmutableChunkState::Reserved => Err(true),
ImmutableChunkState::Evicted => Err(false)
}
}) {
Ok(lock) => return Some(lock),
Err((_, false)) => return None,
Err((l, true)) =>
lock = l.recover(&self.state.as_ref()).unwrap()
.wait_for_read(self.state.cond()).unwrap()
}
}
}
pub fn begin_evict(&self) -> Option<Option<Index<'id>>> {
use self::ImmutableChunkState::*;
let state: &SharedMutex<_> = self.state.as_ref();
state.try_write().ok().and_then(|mut state| {
match mem::replace(&mut *state, Evicted) {
Stable(index) => Some(Some(index)),
Reserved => {
*state = Reserved;
None
},
Evicted => Some(None),
}
})
}
}
#[derive(Debug, PartialEq)]
pub enum ImmutableChunkState<'id> {
Reserved,
Evicted,
Stable(Index<'id>)
}