1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::collections::HashMap;
use std::future::Future;

use futures_util::future::join_all;
use summa_core::components::IndexHolder;
use summa_core::utils::sync::Handler;
use tracing::{info, instrument};

use crate::components::consumers::ConsumerThread;
use crate::errors::{SummaServerResult, ValidationError};

#[derive(Debug)]
pub struct StoppedConsumption {
    consumer_thread: Box<dyn ConsumerThread>,
}

impl StoppedConsumption {
    pub async fn commit_offsets(self) -> SummaServerResult<PreparedConsumption> {
        self.consumer_thread.commit().await?;
        Ok(PreparedConsumption {
            committed_consumer_thread: self.consumer_thread,
        })
    }

    pub fn ignore(self) -> PreparedConsumption {
        PreparedConsumption {
            committed_consumer_thread: self.consumer_thread,
        }
    }
}

#[derive(Debug)]
pub struct PreparedConsumption {
    committed_consumer_thread: Box<dyn ConsumerThread>,
}

impl PreparedConsumption {
    #[cfg(not(feature = "kafka"))]
    pub fn from_config(_consumer_name: &str, _consumer_config: &crate::configs::consumer::Config) -> SummaServerResult<PreparedConsumption> {
        unimplemented!();
    }
    #[cfg(feature = "kafka")]
    pub fn from_config(consumer_name: &str, consumer_config: &crate::configs::consumer::Config) -> SummaServerResult<PreparedConsumption> {
        Ok(PreparedConsumption {
            committed_consumer_thread: Box::new(crate::components::consumers::kafka::KafkaConsumerThread::new(consumer_name, consumer_config)?)
                as Box<dyn ConsumerThread>,
        })
    }

    pub fn on_create(&self) -> impl Future<Output = SummaServerResult<()>> + '_ {
        self.committed_consumer_thread.on_create()
    }

    pub fn on_delete(&self) -> impl Future<Output = SummaServerResult<()>> + '_ {
        self.committed_consumer_thread.on_delete()
    }

    pub fn consumer_name(&self) -> &str {
        self.committed_consumer_thread.consumer_name()
    }
}

#[derive(Debug, Default)]
pub struct ConsumerManager {
    consumptions: HashMap<Handler<IndexHolder>, Box<dyn ConsumerThread>>,
}

impl ConsumerManager {
    pub fn new() -> ConsumerManager {
        ConsumerManager { consumptions: HashMap::new() }
    }

    /// Starts prepared consumption to the index
    #[instrument(skip(self, index_holder, prepared_consumption), fields(index_name = index_holder.index_name(), consumer_name = prepared_consumption.consumer_name()))]
    pub async fn start_consuming(&mut self, index_holder: &Handler<IndexHolder>, prepared_consumption: PreparedConsumption) -> SummaServerResult<()> {
        if self.consumptions.contains_key(index_holder) {
            return Err(ValidationError::ExistingConsumer(index_holder.index_name().to_string()).into());
        }
        let index_writer_holder = index_holder.index_writer_holder()?.clone().read_owned().await;
        let schema = index_holder.schema().clone();
        let conflict_strategy = index_holder.conflict_strategy();
        prepared_consumption
            .committed_consumer_thread
            .start(index_writer_holder, conflict_strategy, schema)
            .await?;
        self.consumptions.insert(index_holder.clone(), prepared_consumption.committed_consumer_thread);
        Ok(())
    }

    /// Stops all consuming threads
    #[instrument(skip(self))]
    pub async fn stop(&mut self) -> SummaServerResult<()> {
        info!(action = "stopping");
        join_all(self.consumptions.drain().map(|(index_holder, consumer_thread)| async move {
            consumer_thread.stop().await?;
            let stopped_consumption = StoppedConsumption { consumer_thread };
            let mut index_writer_holder = index_holder.index_writer_holder()?.clone().write_owned().await;
            tokio::task::spawn_blocking(move || index_writer_holder.commit_and_prepare(false)).await??;
            stopped_consumption.commit_offsets().await?;
            Ok(())
        }))
        .await
        .into_iter()
        .collect::<SummaServerResult<_>>()
    }

    /// Stops particular `IndexHolder`
    #[instrument(skip(self))]
    pub async fn stop_consuming_for(&mut self, index_holder: &Handler<IndexHolder>) -> SummaServerResult<Option<StoppedConsumption>> {
        if let Some(consumer_thread) = self.consumptions.remove(index_holder) {
            consumer_thread.stop().await?;
            Ok(Some(StoppedConsumption { consumer_thread }))
        } else {
            Ok(None)
        }
    }

    pub async fn get_consumer_for(&self, index_holder: &Handler<IndexHolder>) -> Option<&Box<dyn ConsumerThread>> {
        self.consumptions.get(index_holder)
    }

    pub fn find_index_holder_for(&self, consumer_name: &str) -> Option<Handler<IndexHolder>> {
        for (index_holder, consumer_thread) in &self.consumptions {
            if consumer_thread.consumer_name() == consumer_name {
                return Some(index_holder.clone());
            }
        }
        None
    }

    pub fn find_consumer_config_for(&self, consumer_name: &str) -> Option<&crate::configs::consumer::Config> {
        for consumer_thread in self.consumptions.values() {
            if consumer_thread.consumer_name() == consumer_name {
                return Some(consumer_thread.config());
            }
        }
        None
    }
}