use super::EventDispatcher;
use crate::event_api::request::{Request, Visitor};
use crate::{client::UserContext, datafile::Datafile};
use crate::{Conversion, Decision};
use std::sync::mpsc;
use std::thread;
struct ThreadMessage {
visitor: Visitor,
event: EventEnum,
}
enum EventEnum {
Conversion(Conversion),
Decision(Decision),
}
const DEFAULT_BATCH_THRESHOLD: usize = 10;
pub struct BatchedEventDispatcher {
receiver_thread: Option<thread::JoinHandle<()>>,
transmitter_channel: Option<mpsc::Sender<ThreadMessage>>,
}
impl BatchedEventDispatcher {
pub fn new(datafile: &Datafile) -> Self {
let mut request = Request::new(datafile);
let (transmitter_channel, receiver_channel) = mpsc::channel::<ThreadMessage>();
let receiver_thread = thread::spawn(move || {
for message in receiver_channel.iter() {
let ThreadMessage { visitor, event } = message;
match event {
EventEnum::Conversion(conversion) => {
request.add_conversion_event(visitor, conversion);
}
EventEnum::Decision(decision) => {
request.add_decision_event(visitor, decision);
}
}
if request.buffer_size() >= DEFAULT_BATCH_THRESHOLD {
log::debug!("Reached DEFAULT_BATCH_THRESHOLD");
request.send();
}
}
});
BatchedEventDispatcher {
receiver_thread: Some(receiver_thread),
transmitter_channel: Some(transmitter_channel),
}
}
}
impl EventDispatcher for BatchedEventDispatcher {
fn send_conversion_event(&self, user_context: &UserContext, conversion: Conversion) {
self.transmit(user_context, EventEnum::Conversion(conversion))
}
fn send_decision_event(&self, user_context: &UserContext, decision: Decision) {
self.transmit(user_context, EventEnum::Decision(decision))
}
}
impl Drop for BatchedEventDispatcher {
fn drop(&mut self) {
if let Some(channel) = self.transmitter_channel.take() {
drop(channel);
}
if let Some(thread) = self.receiver_thread.take() {
let result = thread.join();
if result.is_err() {
log::error!("Failed to wait for receiver thread");
}
}
}
}
impl BatchedEventDispatcher {
fn transmit(&self, user_context: &UserContext, event: EventEnum) {
let visitor = Visitor::from(user_context);
let message = ThreadMessage { visitor, event };
match &self.transmitter_channel {
Some(channel) => match channel.send(message) {
Ok(_) => {
log::debug!("Successfully sent message to thread");
}
Err(_) => {
log::error!("Failed to send message to thread");
}
},
None => {
log::error!("Transmitter already dropped");
}
}
}
}