open_feature_flagd/resolver/in_process/storage/
mod.rs

1pub mod connector;
2pub use connector::{Connector, QueuePayload, QueuePayloadType};
3use tracing::{debug, error};
4
5use crate::resolver::in_process::model::feature_flag::FeatureFlag;
6use crate::resolver::in_process::model::flag_parser::FlagParser;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::mpsc::{channel, Receiver, Sender};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, PartialEq)]
13pub enum StorageState {
14    Ok,
15    Stale,
16    Error,
17}
18
19#[derive(Debug, Clone)]
20pub struct StorageStateChange {
21    pub storage_state: StorageState,
22    pub changed_flags_keys: Vec<String>,
23    pub sync_metadata: HashMap<String, serde_json::Value>,
24}
25
26#[derive(Debug, Clone)]
27pub struct StorageQueryResult {
28    pub feature_flag: Option<FeatureFlag>,
29    pub flag_set_metadata: HashMap<String, serde_json::Value>,
30}
31
32pub struct FlagStore {
33    flags: Arc<RwLock<HashMap<String, FeatureFlag>>>,
34    flag_set_metadata: Arc<RwLock<HashMap<String, serde_json::Value>>>,
35    state_sender: Sender<StorageStateChange>,
36    connector: Arc<dyn Connector>,
37}
38
39impl FlagStore {
40    pub fn new(connector: Arc<dyn Connector>) -> (Self, Receiver<StorageStateChange>) {
41        let (state_sender, state_receiver) = channel(1000);
42
43        (
44            Self {
45                flags: Arc::new(RwLock::new(HashMap::new())),
46                flag_set_metadata: Arc::new(RwLock::new(HashMap::new())),
47                state_sender,
48                connector,
49            },
50            state_receiver,
51        )
52    }
53
54    pub async fn init(&self) -> anyhow::Result<()> {
55        debug!("Initializing flag store");
56        self.connector.init().await?;
57
58        // Handle initial sync
59        let stream = self.connector.get_stream();
60        let mut receiver = stream.lock().await;
61        debug!("Waiting for initial sync message");
62
63        if let Some(receiver_ref) = receiver.as_mut() {
64            match tokio::time::timeout(std::time::Duration::from_secs(5), receiver_ref.recv())
65                .await?
66            {
67                Some(payload) => {
68                    debug!("Received initial sync message");
69                    match payload.payload_type {
70                        QueuePayloadType::Data => {
71                            debug!("Parsing flag data: {}", &payload.flag_data);
72                            let parsing_result = FlagParser::parse_string(&payload.flag_data)?;
73                            let mut flags_write = self.flags.write().await;
74                            let mut metadata_write = self.flag_set_metadata.write().await;
75                            *flags_write = parsing_result.flags;
76                            *metadata_write = parsing_result.flag_set_metadata;
77                            debug!("Successfully parsed {} flags", flags_write.len());
78                        }
79                        QueuePayloadType::Error => {
80                            error!("Error in initial sync");
81                            return Err(anyhow::anyhow!("Error in initial sync"));
82                        }
83                    }
84                }
85                None => {
86                    error!("No initial sync message received");
87                    return Err(anyhow::anyhow!("No initial sync message received"));
88                }
89            }
90        }
91
92        // Start continuous stream processing
93        self.start_stream_listener().await;
94        Ok(())
95    }
96
97    pub async fn shutdown(&self) -> anyhow::Result<()> {
98        self.connector.shutdown().await
99    }
100
101    pub async fn get_flag(&self, key: &str) -> StorageQueryResult {
102        let flags = self.flags.read().await;
103        let metadata = self.flag_set_metadata.read().await;
104
105        StorageQueryResult {
106            feature_flag: flags.get(key).cloned(),
107            flag_set_metadata: metadata.clone(),
108        }
109    }
110
111    async fn start_stream_listener(&self) {
112        let flags = self.flags.clone();
113        let metadata = self.flag_set_metadata.clone();
114        let sender = self.state_sender.clone();
115        let stream = self.connector.get_stream();
116
117        tokio::spawn(async move {
118            let mut receiver = stream.lock().await;
119            if let Some(receiver) = receiver.as_mut() {
120                while let Some(payload) = receiver.recv().await {
121                    match payload.payload_type {
122                        QueuePayloadType::Data => {
123                            if let Ok(parsing_result) = FlagParser::parse_string(&payload.flag_data)
124                            {
125                                let mut flags_write = flags.write().await;
126                                let mut metadata_write = metadata.write().await;
127                                *flags_write = parsing_result.flags;
128                                *metadata_write = parsing_result.flag_set_metadata;
129                                let _ = sender
130                                    .send(StorageStateChange {
131                                        storage_state: StorageState::Ok,
132                                        changed_flags_keys: vec![],
133                                        sync_metadata: payload.metadata.unwrap_or_default(),
134                                    })
135                                    .await;
136                            }
137                        }
138                        QueuePayloadType::Error => {
139                            let _ = sender
140                                .send(StorageStateChange {
141                                    storage_state: StorageState::Error,
142                                    changed_flags_keys: vec![],
143                                    sync_metadata: HashMap::new(),
144                                })
145                                .await;
146                        }
147                    }
148                }
149            }
150        });
151    }
152}