1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::actors::abstract_actor_ref::ActorRef;
use crate::actors::actor_path::ActorPath;
use crate::actors::message::Message;
use std::collections::HashMap;
use std::sync::{Mutex, Arc};
pub enum WatchingEvents {
Terminated
}
pub mod events {
pub struct Terminated {}
}
pub struct Watcher {
feed: HashMap<ActorPath, (ActorRef, Vec<ActorRef>)>,
watchers: HashMap<ActorPath, u32>
}
impl Watcher {
pub fn new() -> Watcher {
Watcher {
feed: HashMap::new(),
watchers: HashMap::new()
}
}
pub fn watch(&mut self, watcher: &ActorRef, observed: &ActorRef) {
trace!("{} watch {}", watcher, observed);
let obs_path = observed.path().clone();
if self.feed.contains_key(&obs_path) {
let (_, watchers) = self.feed.get_mut(&obs_path).unwrap();
watchers.push((*watcher).clone());
} else {
self.feed.insert(obs_path, ((*observed).clone(), vec![(*watcher).clone()]));
}
let wt_path = watcher.path();
if self.watchers.contains_key(&wt_path) {
let counter = self.watchers.get_mut(&wt_path).unwrap();
*counter = *counter + 1;
} else {
self.watchers.insert(wt_path, 1);
}
}
pub fn unwatch(&mut self, watcher: &ActorRef, observed: &ActorRef) {
trace!("{} unwatch {}", watcher, observed);
let obs_path = observed.path().clone();
if self.feed.contains_key(&obs_path) {
let (_, watchers) = self.feed.get_mut(&obs_path).unwrap();
let mut new: Vec<ActorRef> = Vec::new();
let wat_path = watcher.path();
for w in watchers.iter() {
if w.path() != wat_path {
new.push((*w).clone());
}
}
if new.len() > 0 {
*watchers = new;
} else {
self.feed.remove(&obs_path);
}
}
let wt_path = watcher.path();
if self.watchers.contains_key(&wt_path) {
let counter = self.watchers.get_mut(&wt_path).unwrap();
*counter = *counter - 1;
if *counter <= 0 {
self.watchers.remove(&wt_path);
}
}
}
pub fn register_event(&mut self, from: &ActorRef, event: WatchingEvents) {
let obs_path = from.path().clone();
match event {
WatchingEvents::Terminated => {
trace!("Registered event 'Terminated' from {}", from);
if self.feed.contains_key(&obs_path) {
let (observed, watchers) =
self.feed.get_mut(&obs_path).unwrap();
for aref in watchers {
trace!("Send Terminated event message from {} to {}", from, aref);
aref.tell(msg!(events::Terminated {}), Some(&observed))
}
self.feed.remove(&obs_path);
}
let wt_path = from.path();
if self.watchers.contains_key(&wt_path) {
warn!("At the time of termination actor {} is subscribed for some events. You must prefer controlled unwatch!", from);
let mut fnv = Vec::new();
for (_, (obs, _)) in self.feed.iter_mut() {
fnv.push(obs.clone());
}
for obs in fnv {
self.unwatch(from, &obs);
}
}
}
}
}
}