use crossbeam_channel::{bounded, Sender};
use std::sync::Once;
use std::thread;
use thiserror::Error;
use super::dispatcher::{EventDispatcher, EventDispatcherMessage};
use super::event::InputEvent;
use super::EventsConfiguration;
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum EventProcessorError {
    #[error(transparent)]
    SpawnFailed(#[from] std::io::Error),
}
pub trait EventProcessor: Send + Sync {
    fn send(&self, event: InputEvent);
    fn flush(&self);
    fn close(&self);
}
pub struct NullEventProcessor {}
impl NullEventProcessor {
    pub fn new() -> Self {
        Self {}
    }
}
impl EventProcessor for NullEventProcessor {
    fn send(&self, _: InputEvent) {}
    fn flush(&self) {}
    fn close(&self) {}
}
pub struct EventProcessorImpl {
    inbox_tx: Sender<EventDispatcherMessage>,
    inbox_full_once: Once,
}
impl EventProcessorImpl {
    pub fn new(events_configuration: EventsConfiguration) -> Result<Self, EventProcessorError> {
        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
        let mut dispatcher = EventDispatcher::new(events_configuration);
        match thread::Builder::new().spawn(move || dispatcher.start(inbox_rx)) {
            Ok(_) => Ok(Self {
                inbox_tx,
                inbox_full_once: Once::new(),
            }),
            Err(e) => Err(EventProcessorError::SpawnFailed(e)),
        }
    }
}
impl EventProcessor for EventProcessorImpl {
    fn send(&self, event: InputEvent) {
        if self
            .inbox_tx
            .try_send(EventDispatcherMessage::EventMessage(event))
            .is_err()
        {
            self.inbox_full_once.call_once(|| {
                warn!("Events are being produced faster than they can be processed; some events will be dropped")
            });
        }
    }
    fn flush(&self) {
        let _ = self.inbox_tx.try_send(EventDispatcherMessage::Flush);
    }
    fn close(&self) {
        let (sender, receiver) = bounded::<()>(1);
        if self.inbox_tx.send(EventDispatcherMessage::Flush).is_err() {
            error!("Failed to send final flush message. Cannot stop event processor");
            return;
        }
        if self
            .inbox_tx
            .send(EventDispatcherMessage::Close(sender))
            .is_err()
        {
            error!("Failed to send close message. Cannot stop event processor");
            return;
        }
        let _ = receiver.recv();
    }
}
#[cfg(test)]
mod tests {
    use std::time::Duration;
    use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, Flag, FlagValue, Reason};
    use test_case::test_case;
    use crate::{
        events::{
            create_event_sender, create_events_configuration,
            event::{EventFactory, OutputEvent},
        },
        test_common::basic_flag,
    };
    use super::*;
    #[test]
    fn calling_close_on_processor_twice_returns() {
        let (event_sender, _) = create_event_sender();
        let events_configuration =
            create_events_configuration(event_sender, Duration::from_secs(100));
        let event_processor =
            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
        event_processor.close();
        event_processor.close();
    }
    #[test_case(true, vec!["index", "feature", "summary"])]
    #[test_case(false, vec!["index", "summary"])]
    fn sending_feature_event_emits_correct_events(
        flag_track_events: bool,
        expected_event_types: Vec<&str>,
    ) {
        let mut flag = basic_flag("flag");
        flag.track_events = flag_track_events;
        let context = ContextBuilder::new("foo")
            .build()
            .expect("Failed to create context");
        let detail = Detail {
            value: Some(FlagValue::from(false)),
            variation_index: Some(1),
            reason: Reason::Fallthrough {
                in_experiment: false,
            },
        };
        let event_factory = EventFactory::new(true);
        let feature_request = event_factory.new_eval_event(
            &flag.key,
            context,
            &flag,
            detail,
            FlagValue::from(false),
            None,
        );
        let (event_sender, event_rx) = create_event_sender();
        let events_configuration =
            create_events_configuration(event_sender, Duration::from_secs(100));
        let event_processor =
            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
        event_processor.send(feature_request);
        event_processor.flush();
        event_processor.close();
        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
        assert_eq!(expected_event_types.len(), events.len());
        for event_type in expected_event_types {
            assert!(events.iter().any(|e| e.kind() == event_type));
        }
    }
    #[test]
    fn sending_feature_event_with_rule_track_events_emits_feature_and_summary() {
        let flag: Flag = serde_json::from_str(
            r#"{
                 "key": "with_rule",
                 "on": true,
                 "targets": [],
                 "prerequisites": [],
                 "rules": [
                   {
                     "id": "rule-0",
                     "clauses": [{
                       "attribute": "key",
                       "negate": false,
                       "op": "matches",
                       "values": ["do-track"]
                     }],
                     "trackEvents": true,
                     "variation": 1
                   },
                   {
                     "id": "rule-1",
                     "clauses": [{
                       "attribute": "key",
                       "negate": false,
                       "op": "matches",
                       "values": ["no-track"]
                     }],
                     "trackEvents": false,
                     "variation": 1
                   }
                 ],
                 "fallthrough": {"variation": 0},
                 "trackEventsFallthrough": true,
                 "offVariation": 0,
                 "clientSideAvailability": {
                   "usingMobileKey": false,
                   "usingEnvironmentId": false
                 },
                 "salt": "kosher",
                 "version": 2,
                 "variations": [false, true]
               }"#,
        )
        .expect("flag should parse");
        let context_track_rule = ContextBuilder::new("do-track")
            .build()
            .expect("Failed to create context");
        let context_notrack_rule = ContextBuilder::new("no-track")
            .build()
            .expect("Failed to create context");
        let context_fallthrough = ContextBuilder::new("foo")
            .build()
            .expect("Failed to create context");
        let detail_track_rule = Detail {
            value: Some(FlagValue::from(true)),
            variation_index: Some(1),
            reason: Reason::RuleMatch {
                rule_index: 0,
                rule_id: "rule-0".into(),
                in_experiment: false,
            },
        };
        let detail_notrack_rule = Detail {
            value: Some(FlagValue::from(true)),
            variation_index: Some(1),
            reason: Reason::RuleMatch {
                rule_index: 1,
                rule_id: "rule-1".into(),
                in_experiment: false,
            },
        };
        let detail_fallthrough = Detail {
            value: Some(FlagValue::from(false)),
            variation_index: Some(0),
            reason: Reason::Fallthrough {
                in_experiment: false,
            },
        };
        let event_factory = EventFactory::new(true);
        let fre_track_rule = event_factory.new_eval_event(
            &flag.key,
            context_track_rule,
            &flag,
            detail_track_rule,
            FlagValue::from(false),
            None,
        );
        let fre_notrack_rule = event_factory.new_eval_event(
            &flag.key,
            context_notrack_rule,
            &flag,
            detail_notrack_rule,
            FlagValue::from(false),
            None,
        );
        let fre_fallthrough = event_factory.new_eval_event(
            &flag.key,
            context_fallthrough,
            &flag,
            detail_fallthrough,
            FlagValue::from(false),
            None,
        );
        let (event_sender, event_rx) = create_event_sender();
        let events_configuration =
            create_events_configuration(event_sender, Duration::from_secs(100));
        let event_processor =
            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
        for fre in [fre_track_rule, fre_notrack_rule, fre_fallthrough] {
            event_processor.send(fre);
        }
        event_processor.flush();
        event_processor.close();
        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
        assert_eq!(events.len(), 2 + 1 + 2 + 1);
        assert_eq!(
            events
                .iter()
                .filter(|event| event.kind() == "feature")
                .count(),
            2
        );
        assert!(events.iter().any(|e| e.kind() == "index"));
        assert!(events.iter().any(|e| e.kind() == "summary"));
    }
    #[test]
    fn feature_events_dedupe_index_events() {
        let flag = basic_flag("flag");
        let context = ContextBuilder::new("bar")
            .build()
            .expect("Failed to create context");
        let detail = Detail {
            value: Some(FlagValue::from(false)),
            variation_index: Some(1),
            reason: Reason::Fallthrough {
                in_experiment: false,
            },
        };
        let event_factory = EventFactory::new(true);
        let feature_request = event_factory.new_eval_event(
            &flag.key,
            context,
            &flag,
            detail,
            FlagValue::from(false),
            None,
        );
        let (event_sender, event_rx) = create_event_sender();
        let events_configuration =
            create_events_configuration(event_sender, Duration::from_secs(100));
        let event_processor =
            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
        event_processor.send(feature_request.clone());
        event_processor.send(feature_request);
        event_processor.flush();
        event_processor.close();
        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
        assert_eq!(events.len(), 2);
        assert_eq!(
            events
                .iter()
                .filter(|event| event.kind() == "index")
                .count(),
            1
        );
        assert_eq!(
            events
                .iter()
                .filter(|event| event.kind() == "summary")
                .count(),
            1
        );
    }
}