use slab::*;
use core;
use std::fmt;
use std::collections::HashMap;
use std::sync::{Arc,RwLock};
use memorefhead::*;
use context::Context;
use error::*;
use futures::{Stream,Sink,Future};
use futures::sync::mpsc::channel;
pub const SUBJECT_MAX_RELATIONS : usize = 256;
#[derive(Copy,Clone,Eq,PartialEq,Ord,PartialOrd,Hash,Debug,Serialize,Deserialize)]
pub enum SubjectType {
IndexNode,
Record,
}
#[derive(Copy,Clone,Eq,PartialEq,Ord,PartialOrd,Hash,Debug,Serialize,Deserialize)]
pub struct SubjectId {
pub id: u64,
pub stype: SubjectType,
}
impl <'a> core::cmp::PartialEq<&'a str> for SubjectId {
fn eq (&self, other: &&'a str) -> bool {
self.concise_string() == *other
}
}
impl SubjectId {
pub fn test(test_id: u64) -> Self{
SubjectId{
id: test_id,
stype: SubjectType::Record
}
}
pub fn index_test(test_id: u64) -> Self{
SubjectId{
id: test_id,
stype: SubjectType::IndexNode
}
}
pub fn concise_string (&self) -> String {
use self::SubjectType::*;
match self.stype {
IndexNode => format!("I{}", self.id),
Record => format!("R{}", self.id)
}
}
}
impl fmt::Display for SubjectId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}-{}", self.stype, self.id)
}
}
pub(crate) struct Subject {
pub id: SubjectId,
pub (crate) head: Arc<RwLock<MemoRefHead>>, }
impl Subject {
pub fn new (context: &Context, stype: SubjectType, vals: HashMap<String,String> ) -> Result<Self,WriteError> {
let slab = &context.slab;
let id = slab.generate_subject_id(stype);
let memoref = slab.new_memo_basic_noparent(
Some(id),
MemoBody::FullyMaterialized {v: vals, r: RelationSet::empty(), e: EdgeSet::empty(), t: stype.clone() }
);
let head = memoref.to_head();
let subject = Subject{ id, head: Arc::new(RwLock::new(head.clone())) };
subject.update_referents( context )?;
Ok(subject)
}
fn update_referents (&self, context: &Context) -> Result<(),WriteError> {
match self.id.stype {
SubjectType::IndexNode => {
let head = self.head.read().unwrap();
context.apply_head( &*head )?;
},
SubjectType::Record => {
context.insert_into_root_index( self.id, &self )?;
}
}
Ok(())
}
pub fn reconstitute (_context: &Context, head: MemoRefHead) -> Result<Subject,RetrieveError> {
if let Some(subject_id) = head.subject_id(){
let subject = Subject{
id: subject_id,
head: Arc::new(RwLock::new(head))
};
Ok(subject)
}else{
Err(RetrieveError::InvalidMemoRefHead)
}
}
pub fn get_value ( &self, context: &Context, key: &str ) -> Result<Option<String>, RetrieveError> {
let chead = context.get_relevant_subject_head(self.id)?;
self.head.write().unwrap().apply( &chead, &context.slab )?;
self.head.read().unwrap().project_value(&context.slab, key)
}
pub fn get_relation ( &self, context: &Context, key: RelationSlotId ) -> Result<Option<Subject>, RetrieveError> {
self.head.write().unwrap().apply( &context.get_resident_subject_head(self.id), &context.slab )?;
match self.head.read().unwrap().project_relation(&context.slab, key)? {
Some(subject_id) => context.get_subject(subject_id),
None => Ok(None),
}
}
pub fn get_edge ( &self, context: &Context, key: RelationSlotId ) -> Result<Option<Subject>, RetrieveError> {
match self.get_edge_head(context,key)? {
Some(head) => {
Ok( Some( context.get_subject_with_head(head)? ) )
},
None => {
Ok(None)
}
}
}
pub fn get_edge_head ( &self, context: &Context, key: RelationSlotId ) -> Result<Option<MemoRefHead>, RetrieveError> {
self.head.write().unwrap().apply( &context.get_resident_subject_head(self.id), &context.slab )?;
self.head.read().unwrap().project_edge(&context.slab, key)
}
pub fn set_value (&self, context: &Context, key: &str, value: &str) -> Result<bool,WriteError> {
let mut vals = HashMap::new();
vals.insert(key.to_string(), value.to_string());
let slab = &context.slab;
{
let mut head = self.head.write().unwrap();
let memoref = slab.new_memo_basic(
Some(self.id),
head.clone(),
MemoBody::Edit(vals)
);
head.apply_memoref(&memoref, &slab)?;
}
self.update_referents( context )?;
Ok(true)
}
pub fn set_relation (&self, context: &Context, key: RelationSlotId, relation: &Self) -> Result<(),WriteError> {
let mut relationset = RelationSet::empty();
relationset.insert( key, relation.id );
let slab = &context.slab;
{
let mut head = self.head.write().unwrap();
let memoref = slab.new_memo(
Some(self.id),
head.clone(),
MemoBody::Relation(relationset)
);
head.apply_memoref(&memoref, &slab)?;
};
self.update_referents( context )
}
pub fn set_edge (&self, context: &Context, key: RelationSlotId, edge: &Self) -> Result<(),WriteError>{
let mut edgeset = EdgeSet::empty();
edgeset.insert( key, edge.get_head() );
let slab = &context.slab;
{
let mut head = self.head.write().unwrap();
let memoref = slab.new_memo(
Some(self.id),
head.clone(),
MemoBody::Edge(edgeset)
);
head.apply_memoref(&memoref, &slab)?;
}
self.update_referents( context )
}
pub fn get_head (&self) -> MemoRefHead {
self.head.read().unwrap().clone()
}
pub fn get_all_memo_ids ( &self, slab: &Slab ) -> Vec<MemoId> {
self.get_head().causal_memo_iter( &slab ).map(|m| m.expect("Memo retrieval error. TODO: Update to use Result<..,RetrieveError>").id ).collect()
}
pub fn observe (&self, slab: &Slab) -> Box<Stream<Item=MemoRefHead, Error = ()>> {
let (tx, rx) = channel::<MemoRefHead>(1);
let head = self.head.clone();
let slab_clone = slab.clone();
let rx2 = rx.map(move |mrh| {
if let Err(_) = head.write().unwrap().apply(&mrh,&slab_clone) {
}
mrh
});
tx.clone().send( self.head.read().unwrap().clone() ).wait().unwrap();
slab.observe_subject( self.id, tx );
Box::new(rx2)
}
}
impl Clone for Subject {
fn clone (&self) -> Subject {
Self{
id: self.id,
head: self.head.clone()
}
}
}
impl Drop for Subject {
fn drop (&mut self) {
}
}
impl fmt::Debug for Subject {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Subject")
.field("subject_id", &self.id)
.field("head", &self.head)
.finish()
}
}