open_feature_flagd/resolver/in_process/storage/
mod.rs1pub mod connector;
2pub use connector::{Connector, QueuePayload, QueuePayloadType};
3use tracing::{debug, error};
4
5use crate::resolver::in_process::model::feature_flag::FeatureFlag;
6use crate::resolver::in_process::model::flag_parser::FlagParser;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::mpsc::{channel, Receiver, Sender};
10use tokio::sync::RwLock;
11
12#[derive(Debug, Clone, PartialEq)]
13pub enum StorageState {
14 Ok,
15 Stale,
16 Error,
17}
18
19#[derive(Debug, Clone)]
20pub struct StorageStateChange {
21 pub storage_state: StorageState,
22 pub changed_flags_keys: Vec<String>,
23 pub sync_metadata: HashMap<String, serde_json::Value>,
24}
25
26#[derive(Debug, Clone)]
27pub struct StorageQueryResult {
28 pub feature_flag: Option<FeatureFlag>,
29 pub flag_set_metadata: HashMap<String, serde_json::Value>,
30}
31
32pub struct FlagStore {
33 flags: Arc<RwLock<HashMap<String, FeatureFlag>>>,
34 flag_set_metadata: Arc<RwLock<HashMap<String, serde_json::Value>>>,
35 state_sender: Sender<StorageStateChange>,
36 connector: Arc<dyn Connector>,
37}
38
39impl FlagStore {
40 pub fn new(connector: Arc<dyn Connector>) -> (Self, Receiver<StorageStateChange>) {
41 let (state_sender, state_receiver) = channel(1000);
42
43 (
44 Self {
45 flags: Arc::new(RwLock::new(HashMap::new())),
46 flag_set_metadata: Arc::new(RwLock::new(HashMap::new())),
47 state_sender,
48 connector,
49 },
50 state_receiver,
51 )
52 }
53
54 pub async fn init(&self) -> anyhow::Result<()> {
55 debug!("Initializing flag store");
56 self.connector.init().await?;
57
58 let stream = self.connector.get_stream();
60 let mut receiver = stream.lock().await;
61 debug!("Waiting for initial sync message");
62
63 if let Some(receiver_ref) = receiver.as_mut() {
64 match tokio::time::timeout(std::time::Duration::from_secs(5), receiver_ref.recv())
65 .await?
66 {
67 Some(payload) => {
68 debug!("Received initial sync message");
69 match payload.payload_type {
70 QueuePayloadType::Data => {
71 debug!("Parsing flag data: {}", &payload.flag_data);
72 let parsing_result = FlagParser::parse_string(&payload.flag_data)?;
73 let mut flags_write = self.flags.write().await;
74 let mut metadata_write = self.flag_set_metadata.write().await;
75 *flags_write = parsing_result.flags;
76 *metadata_write = parsing_result.flag_set_metadata;
77 debug!("Successfully parsed {} flags", flags_write.len());
78 }
79 QueuePayloadType::Error => {
80 error!("Error in initial sync");
81 return Err(anyhow::anyhow!("Error in initial sync"));
82 }
83 }
84 }
85 None => {
86 error!("No initial sync message received");
87 return Err(anyhow::anyhow!("No initial sync message received"));
88 }
89 }
90 }
91
92 self.start_stream_listener().await;
94 Ok(())
95 }
96
97 pub async fn shutdown(&self) -> anyhow::Result<()> {
98 self.connector.shutdown().await
99 }
100
101 pub async fn get_flag(&self, key: &str) -> StorageQueryResult {
102 let flags = self.flags.read().await;
103 let metadata = self.flag_set_metadata.read().await;
104
105 StorageQueryResult {
106 feature_flag: flags.get(key).cloned(),
107 flag_set_metadata: metadata.clone(),
108 }
109 }
110
111 async fn start_stream_listener(&self) {
112 let flags = self.flags.clone();
113 let metadata = self.flag_set_metadata.clone();
114 let sender = self.state_sender.clone();
115 let stream = self.connector.get_stream();
116
117 tokio::spawn(async move {
118 let mut receiver = stream.lock().await;
119 if let Some(receiver) = receiver.as_mut() {
120 while let Some(payload) = receiver.recv().await {
121 match payload.payload_type {
122 QueuePayloadType::Data => {
123 if let Ok(parsing_result) = FlagParser::parse_string(&payload.flag_data)
124 {
125 let mut flags_write = flags.write().await;
126 let mut metadata_write = metadata.write().await;
127 *flags_write = parsing_result.flags;
128 *metadata_write = parsing_result.flag_set_metadata;
129 let _ = sender
130 .send(StorageStateChange {
131 storage_state: StorageState::Ok,
132 changed_flags_keys: vec![],
133 sync_metadata: payload.metadata.unwrap_or_default(),
134 })
135 .await;
136 }
137 }
138 QueuePayloadType::Error => {
139 let _ = sender
140 .send(StorageStateChange {
141 storage_state: StorageState::Error,
142 changed_flags_keys: vec![],
143 sync_metadata: HashMap::new(),
144 })
145 .await;
146 }
147 }
148 }
149 }
150 });
151 }
152}