1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use super::*;
impl Deref for Slab {
type Target = SlabInner;
fn deref(&self) -> &SlabInner {
&*self.0
}
}
impl Slab {
pub fn new(net: &Network) -> Slab {
let slab_id = net.generate_slab_id();
let my_ref_inner = SlabRefInner {
slab_id: slab_id,
owning_slab_id: slab_id,
presence: RwLock::new(vec![]),
tx: Mutex::new(Transmitter::new_blackhole(slab_id)),
return_address: RwLock::new(TransportAddress::Local),
};
let my_ref = SlabRef(Arc::new(my_ref_inner));
let (memoref_dispatch_tx_channel, memoref_dispatch_rx_channel) = mpsc::channel::<MemoRef>();
let inner = SlabInner {
id: slab_id,
memorefs_by_id: RwLock::new(HashMap::new()),
memo_wait_channels: Mutex::new(HashMap::new()),
subject_subscriptions: RwLock::new(HashMap::new()),
counters: RwLock::new(SlabCounters {
last_memo_id: 5000,
last_subject_id: 9000,
memos_received: 0,
memos_redundantly_received: 0,
}),
memoref_dispatch_tx_channel: Some(Mutex::new(memoref_dispatch_tx_channel)),
memoref_dispatch_thread: RwLock::new(None),
my_ref: my_ref,
peer_refs: RwLock::new(Vec::new()),
net: net.clone(),
dropping: false
};
let me = Slab(Arc::new(inner));
net.register_local_slab(&me);
let weak_self = me.weak();
*me.memoref_dispatch_thread.write().unwrap() = Some(thread::spawn(move || {
while let Ok(memoref) = memoref_dispatch_rx_channel.recv() {
if let Some(slab) = weak_self.upgrade(){
slab.dispatch_memoref(memoref);
}
}
}));
net.conditionally_generate_root_index_seed(&me);
me
}
pub fn weak (&self) -> WeakSlab {
WeakSlab {
id: self.id,
inner: Arc::downgrade(&self.0)
}
}
pub fn get_root_index_seed (&self) -> Option<MemoRefHead> {
self.net.get_root_index_seed(self)
}
pub fn create_context (&self) -> Context {
Context::new(self)
}
pub fn subscribe_subject (&self, subject_id: u64, context: &Context) {
let weakcontext : WeakContext = context.weak();
match self.subject_subscriptions.write().unwrap().entry(subject_id){
Entry::Occupied(mut e) => {
e.get_mut().push(weakcontext)
}
Entry::Vacant(e) => {
e.insert(vec![weakcontext]);
}
}
return;
}
pub fn unsubscribe_subject (&self, subject_id: u64, context: &Context ){
if let Some(subs) = self.subject_subscriptions.write().unwrap().get_mut(&subject_id) {
let weak_context = context.weak();
subs.retain(|c| {
c.cmp(&weak_context)
});
return;
}
}
pub fn memo_wait_channel (&self, memo_id: MemoId ) -> mpsc::Receiver<Memo> {
let (tx, rx) = channel::<Memo>();
match self.memo_wait_channels.lock().unwrap().entry(memo_id) {
Entry::Vacant(o) => { o.insert( vec![tx] ); }
Entry::Occupied(mut o) => { o.get_mut().push(tx); }
};
rx
}
pub fn generate_subject_id(&self) -> SubjectId {
let mut counters = self.counters.write().unwrap();
counters.last_subject_id += 1;
(self.id as u64).rotate_left(32) | counters.last_subject_id as u64
}
}
impl WeakSlab {
pub fn upgrade (&self) -> Option<Slab> {
match self.inner.upgrade() {
Some(i) => Some( Slab(i) ),
None => None
}
}
}