1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//! Actor system is a container for actors runtime. She is used for actor creation,  stop and other
//! operations

use crate::common::tsafe::TSafe;
use crate::actors::props::Props;
use crate::actors::actor_path::ActorPath;
use crate::actors::actor_cell::ActorCell;
use crate::actors::actor::PoisonPill;
use crate::actors::default_dispatcher::DefaultDispatcher;
use crate::actors::dead_letters::DeadLetters;
use crate::actors::synthetic_actor::SyntheticActor;
use crate::actors::unbound_mailbox::UnboundMailbox;
use crate::actors::actor_ref_factory::ActorRefFactory;
use crate::actors::abstract_actor_system::AbstractActorSystem;
use crate::actors::local_actor_ref::LocalActorRef;
use crate::actors::abstract_actor_ref::ActorRef;
use crate::actors::watcher::WatchingEvents;
use crate::actors::watcher::Watcher;
use crate::actors::scheduler::Scheduler;
use crate::actors::message::Message;
use std::sync::{Arc, Mutex};



pub struct LocalActorSystem {
    /// Actors ids counter. His is used for set up actor name, if it was not be explicitly specified.
    nids: usize,

    /// Default dispatcher. Used if other dispatcher does not explicitly specified.
    pub dispatcher: TSafe<DefaultDispatcher>,

    /// Dead letter actor reference. Sending message through this reference has is very low cost,
    /// because message after drop to the mailbox, is simply destroyed without hes subsequent
    /// execution planning. */
    dead_letters: Option<ActorRef>,

    /// Internal tasks scheduler
    scheduler: TSafe<Scheduler>,

    /// Watcher event bus
    watcher: TSafe<Watcher>
}

impl LocalActorSystem {
    /// Create new actor system protected by TSafe guard.
    pub fn new() -> TSafe<LocalActorSystem> {
        let cpu_count = num_cpus::get();
        let dispatcher = tsafe!(DefaultDispatcher::new(cpu_count as u32));
        let system = LocalActorSystem {
            nids: 0,
            dispatcher: dispatcher.clone(),
            dead_letters: None,
            scheduler: tsafe!(Scheduler::new()),
            watcher: tsafe!(Watcher::new())
        };

        let system = tsafe!(system);

        let dlp = tsafe!(ActorPath::new("deadLetters"));
        let dlm = DeadLetters::new();
        let dlc = ActorCell::new(system.clone(), dlp.clone(),tsafe!(SyntheticActor {}), 0, dispatcher.clone(), tsafe!(dlm));

        let boxed_dlc = tsafe!(dlc);


        system.lock().unwrap().dead_letters = Some(Box::new(LocalActorRef::new(boxed_dlc.clone(), dlp)));
        boxed_dlc.lock().unwrap().start(boxed_dlc.clone());

        system
    }
}

//TODO у всех ActorSystem убрать метод run

impl ActorRefFactory for LocalActorSystem {



    /// Creates the new actor from specified Props object and with specified name. If name does not
    /// explicitly specified, it will be generate automatically.
    ///
    /// # Examples
    ///
    /// ```
    ///
    /// ```
    ///
    fn actor_of(self: &mut Self, props: Props, name: Option<&str>) -> ActorRef {
        let mailbox = tsafe!(UnboundMailbox::new());

        let mut aname: String;

        if name.is_some() {
            aname = name.unwrap().to_string();
        } else {
            aname = self.nids.to_string();
            self.nids = self.nids + 1;
        }

        let path = tsafe!(ActorPath::new(&aname));

        let cell = ActorCell::new(
            tsafe!(self.clone()),
            path.clone(),
            props.actor,
            0,
            self.dispatcher.clone(),
            mailbox
        );
        let boxed_cell = tsafe!(cell);

        boxed_cell.lock().unwrap().start(boxed_cell.clone());

        Box::new(LocalActorRef::new(boxed_cell, path))
    }

    /// Stop specified actor by it's reference. Suspends actor, cancels all timers, cleans mailbox
    /// and sends to it the PoisonPill message, which will be processed right away after the current
    /// message (if this call will made from actor's message handler) or depending on the stopped
    /// actor state (if it idle it will be stopped immediately, if not, after processing his current
    /// message )
    ///
    /// # Examples
    ///
    /// ```
    ///
    /// ```
    fn stop(self: &mut Self, aref: &mut ActorRef) {
        // Attention, identical code exists in the PoisonPill handler
        let aref_cpy0 = aref.clone();
        let aref_cpy1 = aref.clone();
        let x = aref.cell();
        let mut cell = x.lock().unwrap();
        cell.suspend();
        // +++ cell.actor.timers().cancelAll();
        cell.mailbox.lock().unwrap().clean_up(aref_cpy0, self.dead_letters());
        cell.force_send(aref.cell().clone(), msg!(PoisonPill {}), None, aref_cpy1);
    }

    /// Return deadLetter actor reference
    fn dead_letters(self: &mut Self) -> ActorRef {
        match &self.dead_letters {
            Some(d) =>  {
                (*d).clone()
            }
            _ => panic!("Dead letter is empty")
        }
    }

    /// Register watcher for receive 'watching events' from observed actor
    fn watch(&mut self, watcher: &ActorRef, observed: &ActorRef) {
        self.watcher.lock().unwrap().watch(watcher, observed);
    }

    /// Unregister watcher from receive 'watching events' from observed actor
    fn unwatch(&mut self, watcher: &ActorRef, observed: &ActorRef) {
        self.watcher.lock().unwrap().unwatch(watcher, observed);
    }
}

impl AbstractActorSystem for LocalActorSystem {

    /// Returns actor system scheduler
    fn get_scheduler(&self) -> TSafe<Scheduler> {
        self.scheduler.clone()
    }

    /// Register new watching event from the specified actor
    fn register_watch_event(&self, from: &ActorRef, event: WatchingEvents) {
        self.watcher.lock().unwrap().register_event(&from, event);
    }

    /// Stops the actor system
    fn terminate(&mut self) {
        self.dispatcher.lock().unwrap().stop();
        self.dead_letters = None;
    }
}

impl Clone for LocalActorSystem {
    fn clone(&self) -> Self {

        let dead_letter: Option<ActorRef> = match &self.dead_letters {
            Some(v) => Some((*v).clone()),
            None => None
        };
        LocalActorSystem {
            nids: self.nids,
            dispatcher: self.dispatcher.clone(),
            dead_letters: dead_letter, //self.dead_letters.clone()
            scheduler: self.scheduler.clone(),
            watcher: self.watcher.clone()
        }
    }
}