1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
mod configmap;
mod secret;

use std::sync::{atomic::AtomicBool, Arc};

use async_trait::async_trait;
use crossbeam::channel::Sender;

use crate::{
    error::Result,
    event::{
        kubernetes::{client::KubeClient, worker::Worker},
        Event,
    },
};

use self::{configmap::ConfigMapDataWorker, secret::SecretDataWorker};

use super::{ConfigData, ConfigRequest, ConfigResponse, RequestData};

#[derive(Clone)]
pub struct ConfigsDataWorker {
    is_terminated: Arc<AtomicBool>,
    tx: Sender<Event>,
    client: KubeClient,
    req: ConfigRequest,
}

impl ConfigsDataWorker {
    pub fn new(
        is_terminated: Arc<AtomicBool>,
        tx: Sender<Event>,
        client: KubeClient,
        req: ConfigRequest,
    ) -> Self {
        Self {
            is_terminated,
            tx,
            client,
            req,
        }
    }
}

#[async_trait]
impl Worker for ConfigsDataWorker {
    type Output = ();

    async fn run(&self) -> Self::Output {
        let ret = match &self.req {
            ConfigRequest::ConfigMap(_) => self.fetch_description::<ConfigMapDataWorker>().await,
            ConfigRequest::Secret(_) => self.fetch_description::<SecretDataWorker>().await,
        };

        if let Err(e) = ret {
            self.tx
                .send(ConfigResponse::Data(Err(e)).into())
                .expect("Failed to send ConfigResponse::Data");
        }
    }
}

#[async_trait]
trait Fetch<'a> {
    fn new(client: &'a KubeClient, namespace: String, name: String) -> Self;

    async fn fetch(&self) -> Result<ConfigData>;
}

const INTERVAL: u64 = 3;

impl ConfigsDataWorker {
    async fn fetch_description<'a, Worker>(&'a self) -> Result<()>
    where
        Worker: Fetch<'a>,
    {
        let mut interval = tokio::time::interval(std::time::Duration::from_secs(INTERVAL));

        let RequestData { name, namespace } = self.req.data();

        let worker = Worker::new(&self.client, namespace.to_string(), name.to_string());

        while !self
            .is_terminated
            .load(std::sync::atomic::Ordering::Relaxed)
        {
            interval.tick().await;

            let fetched_data = worker.fetch().await;

            self.tx
                .send(ConfigResponse::Data(fetched_data).into())
                .expect("Failed to send ConfigResponse::Data");
        }

        Ok(())
    }
}