1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use super::*;
use subject::SubjectType;
use futures::{Future, Sink};
impl Slab {
pub fn dispatch_memoref (&self, memoref : MemoRef){
if let Some(subject_id) = memoref.subject_id {
if let SubjectType::IndexNode = subject_id.stype {
let mut senders = self.index_subscriptions.lock().unwrap();
let len = senders.len();
for i in (0..len).rev() {
if let Err(_) = senders[i].clone().send(memoref.to_head()).wait(){
senders.swap_remove(i);
}
}
}
if let Some(ref mut senders) = self.subject_subscriptions.lock().unwrap().get_mut( &subject_id ) {
let len = senders.len();
for i in (0..len).rev() {
match senders[i].clone().send(memoref.to_head()).wait() {
Ok(..) => { }
Err(_) => {
senders.swap_remove(i);
}
}
}
}
}
}
pub fn handle_memo_from_other_slab( &self, memo: &Memo, memoref: &MemoRef, origin_slabref: &SlabRef ){
match memo.body {
MemoBody::SlabPresence{ p: ref presence, r: ref root_index_seed } => {
match root_index_seed {
&MemoRefHead::Subject{..} | &MemoRefHead::Anonymous{..} => {
for memoref in root_index_seed.iter() {
memoref.update_peer(origin_slabref, MemoPeeringStatus::Resident);
}
self.net.apply_root_index_seed( &presence, root_index_seed, &self.my_ref );
}
&MemoRefHead::Null => {}
}
let mut reply = false;
if let &MemoRefHead::Null = root_index_seed {
reply = true;
}
if reply {
if let Ok(mentioned_slabref) = self.slabref_from_presence( presence ) {
let my_presence_memoref = self.new_memo_basic(
None,
memoref.to_head(),
MemoBody::SlabPresence{
p: self.presence_for_origin( origin_slabref ),
r: self.get_root_index_seed()
}
);
origin_slabref.send( &self.my_ref, &my_presence_memoref );
let _ = mentioned_slabref;
}
}
}
MemoBody::Peering(memo_id, subject_id, ref peerlist ) => {
let (peered_memoref,_had_memo) = self.assert_memoref( memo_id, subject_id, peerlist.clone(), None );
for peer in peerlist.iter().filter(|p| p.slabref.0.slab_id != self.id ) {
peered_memoref.update_peer( &peer.slabref, peer.status.clone());
}
if 0 == peered_memoref.want_peer_count() {
let mut q = self.peering_remediation_queue.lock().unwrap();
q.retain(|mr| mr != &peered_memoref )
}
},
MemoBody::MemoRequest(ref desired_memo_ids, ref requesting_slabref ) => {
if requesting_slabref.0.slab_id != self.id {
for desired_memo_id in desired_memo_ids {
if let Some(desired_memoref) = self.memorefs_by_id.read().unwrap().get(&desired_memo_id) {
if desired_memoref.is_resident() {
requesting_slabref.send(&self.my_ref, desired_memoref)
} else {
self.do_peering(&memoref,requesting_slabref);
}
}else{
let peering_memoref = self.new_memo(
None,
memoref.to_head(),
MemoBody::Peering(
*desired_memo_id,
None,
MemoPeerList::new(vec![MemoPeer{
slabref: self.my_ref.clone(),
status: MemoPeeringStatus::NonParticipating
}])
)
);
requesting_slabref.send(&self.my_ref, &peering_memoref)
}
}
}
}
_ => {}
}
}
}