1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
 * Copyright 2020 UT OVERSEAS INC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

use std::{
    sync::{
        mpsc::{channel, Receiver, Sender},
        Arc, Mutex,
    },
    thread,
};

use crate::concurrent::{logbuffer::term_reader::ErrorHandler, strategies::Strategy};
use crate::utils::errors::AeronError;

/// The trait to be implemented by agents run within AgentRunner
pub trait Agent {
    fn on_start(&mut self) -> Result<(), AeronError>;
    fn do_work(&mut self) -> Result<i32, AeronError>;
    fn on_close(&mut self) -> Result<(), AeronError>;
}

pub struct AgentStopper {
    thread: Option<thread::JoinHandle<()>>,
    tx: Sender<bool>,
}

impl AgentStopper {
    pub fn new(thread: thread::JoinHandle<()>, tx: Sender<bool>) -> Self {
        Self {
            thread: Some(thread),
            tx,
        }
    }

    pub fn stop(&mut self) {
        self.tx.send(true).expect("Can't send stop command to AgentRunner");

        let _b = self.thread.take().unwrap().join();
    }
}

pub struct AgentRunner<
    A: 'static + std::marker::Send + std::marker::Sync + Agent,
    I: 'static + std::marker::Send + std::marker::Sync + Strategy,
> {
    agent: Arc<Mutex<A>>, // need mutable Agent here as AgentRunner will change Agent state while running it
    idle_strategy: Arc<I>,
    exception_handler: ErrorHandler,
    name: String,
}

impl<
        A: 'static + std::marker::Send + std::marker::Sync + Agent,
        I: 'static + std::marker::Send + std::marker::Sync + Strategy,
    > AgentRunner<A, I>
{
    pub fn new(agent: Arc<Mutex<A>>, idle_strategy: Arc<I>, exception_handler: ErrorHandler, name: &str) -> Self {
        Self {
            agent,
            idle_strategy,
            exception_handler,
            name: String::from(name),
        }
    }

    /**
     * Name given to the thread running the agent.
     *
     * @return the name given to the thread running the agent.
     */
    pub fn name(&self) -> String {
        self.name.clone()
    }

    pub fn set_name(&mut self, new_name: &str) {
        self.name = String::from(new_name);
    }

    /**
     * Start the Agent running
     *
     * Will spawn a std::thread.
     * Returns closure which is once called will shutdown the runner.
     */
    pub fn start(mut this: Self) -> Result<AgentStopper, AeronError> {
        let (tx, rx) = channel::<bool>();

        let th = thread::Builder::new().name(this.name.clone()).spawn(move || {
            this.run(rx);
        });

        if let Ok(handle) = th {
            Ok(AgentStopper::new(handle, tx))
        } else {
            Err(AeronError::GenericError(format!("Agent start failed: {:?}", th.err())))
        }
    }

    /**
     * Run the Agent duty cycle until closed
     */
    pub fn run(&mut self, stop_rx: Receiver<bool>) {
        if let Err(error) = self.agent.lock().expect("Mutex poisoned").on_start() {
            (self.exception_handler)(error);
        }

        loop {
            // Monitor message from main thread and be ready to finish the work
            if let Ok(time_to_stop) = stop_rx.try_recv() {
                if time_to_stop {
                    break;
                }
            }

            match self.agent.lock().expect("Mutex poisoned").do_work() {
                Ok(work_cnt) => self.idle_strategy.idle_opt(work_cnt),
                Err(error) => (self.exception_handler)(error),
            }
        }

        if let Err(error) = self.agent.lock().expect("Mutex poisoned").on_close() {
            (self.exception_handler)(error);
        }
    }
}