Skip to main content

winterbaume_keyspaces/
views.rs

1//! Serde-compatible view types for Keyspaces state snapshots.
2
3use std::collections::HashMap;
4
5use serde::{Deserialize, Serialize};
6use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
7
8use crate::handlers::KeyspacesService;
9use crate::state::KeyspacesState;
10use crate::types::{
11    ClusteringKey, ColumnDefinition, FieldDefinition, Keyspace, SchemaDefinition, Table,
12    UserDefinedType,
13};
14
15/// Serializable view of the entire Keyspaces state for one account/region.
16#[derive(Debug, Clone, Default, Serialize, Deserialize)]
17pub struct KeyspacesStateView {
18    #[serde(default)]
19    pub keyspaces: HashMap<String, KeyspaceView>,
20    /// Key: "keyspace_name/table_name"
21    #[serde(default)]
22    pub tables: HashMap<String, TableView>,
23    /// Key: "keyspace_name/type_name"
24    #[serde(default)]
25    pub types: HashMap<String, UserDefinedTypeView>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct KeyspaceView {
30    pub name: String,
31    pub arn: String,
32    pub replication_strategy: String,
33    #[serde(default)]
34    pub replication_regions: Vec<String>,
35    #[serde(default)]
36    pub tags: HashMap<String, String>,
37    pub creation_timestamp: Option<String>,
38    #[serde(default)]
39    pub status: String,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TableView {
44    pub keyspace_name: String,
45    pub table_name: String,
46    pub arn: String,
47    pub schema_definition: SchemaDefinitionView,
48    pub capacity_mode: String,
49    pub read_capacity_units: Option<i64>,
50    pub write_capacity_units: Option<i64>,
51    pub encryption_type: String,
52    pub kms_key_identifier: Option<String>,
53    pub point_in_time_recovery_enabled: bool,
54    pub ttl_status: String,
55    pub default_time_to_live: Option<i32>,
56    #[serde(default)]
57    pub comment: String,
58    pub client_side_timestamps_enabled: bool,
59    #[serde(default)]
60    pub tags: HashMap<String, String>,
61    pub creation_timestamp: Option<String>,
62    #[serde(default)]
63    pub status: String,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SchemaDefinitionView {
68    pub all_columns: Vec<ColumnDefinitionView>,
69    pub partition_keys: Vec<String>,
70    #[serde(default)]
71    pub clustering_keys: Vec<ClusteringKeyView>,
72    #[serde(default)]
73    pub static_columns: Vec<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ColumnDefinitionView {
78    pub name: String,
79    pub column_type: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ClusteringKeyView {
84    pub name: String,
85    pub order_by: String,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct UserDefinedTypeView {
90    pub keyspace_name: String,
91    pub type_name: String,
92    pub field_definitions: Vec<FieldDefinitionView>,
93    pub creation_timestamp: Option<String>,
94    #[serde(default)]
95    pub status: String,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct FieldDefinitionView {
100    pub name: String,
101    pub field_type: String,
102}
103
104// ---- From conversions ----
105
106impl From<&Keyspace> for KeyspaceView {
107    fn from(ks: &Keyspace) -> Self {
108        Self {
109            name: ks.name.clone(),
110            arn: ks.arn.clone(),
111            replication_strategy: ks.replication_strategy.clone(),
112            replication_regions: ks.replication_regions.clone(),
113            tags: ks.tags.clone(),
114            creation_timestamp: Some(ks.creation_timestamp.to_rfc3339()),
115            status: ks.status.clone(),
116        }
117    }
118}
119
120impl From<&Table> for TableView {
121    fn from(t: &Table) -> Self {
122        Self {
123            keyspace_name: t.keyspace_name.clone(),
124            table_name: t.table_name.clone(),
125            arn: t.arn.clone(),
126            schema_definition: SchemaDefinitionView::from(&t.schema_definition),
127            capacity_mode: t.capacity_mode.clone(),
128            read_capacity_units: t.read_capacity_units,
129            write_capacity_units: t.write_capacity_units,
130            encryption_type: t.encryption_type.clone(),
131            kms_key_identifier: t.kms_key_identifier.clone(),
132            point_in_time_recovery_enabled: t.point_in_time_recovery_enabled,
133            ttl_status: t.ttl_status.clone(),
134            default_time_to_live: t.default_time_to_live,
135            comment: t.comment.clone(),
136            client_side_timestamps_enabled: t.client_side_timestamps_enabled,
137            tags: t.tags.clone(),
138            creation_timestamp: Some(t.creation_timestamp.to_rfc3339()),
139            status: t.status.clone(),
140        }
141    }
142}
143
144impl From<&SchemaDefinition> for SchemaDefinitionView {
145    fn from(s: &SchemaDefinition) -> Self {
146        Self {
147            all_columns: s
148                .all_columns
149                .iter()
150                .map(|c| ColumnDefinitionView {
151                    name: c.name.clone(),
152                    column_type: c.column_type.clone(),
153                })
154                .collect(),
155            partition_keys: s.partition_keys.clone(),
156            clustering_keys: s
157                .clustering_keys
158                .iter()
159                .map(|c| ClusteringKeyView {
160                    name: c.name.clone(),
161                    order_by: c.order_by.clone(),
162                })
163                .collect(),
164            static_columns: s.static_columns.clone(),
165        }
166    }
167}
168
169impl From<&UserDefinedType> for UserDefinedTypeView {
170    fn from(t: &UserDefinedType) -> Self {
171        Self {
172            keyspace_name: t.keyspace_name.clone(),
173            type_name: t.type_name.clone(),
174            field_definitions: t
175                .field_definitions
176                .iter()
177                .map(|f| FieldDefinitionView {
178                    name: f.name.clone(),
179                    field_type: f.field_type.clone(),
180                })
181                .collect(),
182            creation_timestamp: Some(t.creation_timestamp.to_rfc3339()),
183            status: t.status.clone(),
184        }
185    }
186}
187
188impl From<KeyspacesStateView> for KeyspacesState {
189    fn from(view: KeyspacesStateView) -> Self {
190        let keyspaces = view
191            .keyspaces
192            .into_iter()
193            .map(|(k, v)| {
194                let ks = Keyspace {
195                    name: v.name,
196                    arn: v.arn,
197                    replication_strategy: v.replication_strategy,
198                    replication_regions: v.replication_regions,
199                    tags: v.tags,
200                    creation_timestamp: v
201                        .creation_timestamp
202                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
203                        .map(|dt| dt.with_timezone(&chrono::Utc))
204                        .unwrap_or_else(chrono::Utc::now),
205                    status: v.status,
206                };
207                (k, ks)
208            })
209            .collect();
210
211        let tables = view
212            .tables
213            .into_values()
214            .map(|v| {
215                let table = Table {
216                    keyspace_name: v.keyspace_name.clone(),
217                    table_name: v.table_name.clone(),
218                    arn: v.arn,
219                    schema_definition: SchemaDefinition {
220                        all_columns: v
221                            .schema_definition
222                            .all_columns
223                            .into_iter()
224                            .map(|c| ColumnDefinition {
225                                name: c.name,
226                                column_type: c.column_type,
227                            })
228                            .collect(),
229                        partition_keys: v.schema_definition.partition_keys,
230                        clustering_keys: v
231                            .schema_definition
232                            .clustering_keys
233                            .into_iter()
234                            .map(|c| ClusteringKey {
235                                name: c.name,
236                                order_by: c.order_by,
237                            })
238                            .collect(),
239                        static_columns: v.schema_definition.static_columns,
240                    },
241                    capacity_mode: v.capacity_mode,
242                    read_capacity_units: v.read_capacity_units,
243                    write_capacity_units: v.write_capacity_units,
244                    encryption_type: v.encryption_type,
245                    kms_key_identifier: v.kms_key_identifier,
246                    point_in_time_recovery_enabled: v.point_in_time_recovery_enabled,
247                    ttl_status: v.ttl_status,
248                    default_time_to_live: v.default_time_to_live,
249                    comment: v.comment,
250                    client_side_timestamps_enabled: v.client_side_timestamps_enabled,
251                    tags: v.tags,
252                    creation_timestamp: v
253                        .creation_timestamp
254                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
255                        .map(|dt| dt.with_timezone(&chrono::Utc))
256                        .unwrap_or_else(chrono::Utc::now),
257                    status: v.status,
258                };
259                (
260                    (table.keyspace_name.clone(), table.table_name.clone()),
261                    table,
262                )
263            })
264            .collect();
265
266        let types = view
267            .types
268            .into_values()
269            .map(|v| {
270                let udt = UserDefinedType {
271                    keyspace_name: v.keyspace_name.clone(),
272                    type_name: v.type_name.clone(),
273                    field_definitions: v
274                        .field_definitions
275                        .into_iter()
276                        .map(|f| FieldDefinition {
277                            name: f.name,
278                            field_type: f.field_type,
279                        })
280                        .collect(),
281                    creation_timestamp: v
282                        .creation_timestamp
283                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
284                        .map(|dt| dt.with_timezone(&chrono::Utc))
285                        .unwrap_or_else(chrono::Utc::now),
286                    status: v.status,
287                };
288                ((udt.keyspace_name.clone(), udt.type_name.clone()), udt)
289            })
290            .collect();
291
292        KeyspacesState {
293            keyspaces,
294            tables,
295            types,
296        }
297    }
298}
299
300impl From<&KeyspacesState> for KeyspacesStateView {
301    fn from(state: &KeyspacesState) -> Self {
302        Self {
303            keyspaces: state
304                .keyspaces
305                .iter()
306                .map(|(k, v)| (k.clone(), KeyspaceView::from(v)))
307                .collect(),
308            tables: state
309                .tables
310                .iter()
311                .map(|((ks, tn), v)| (format!("{ks}/{tn}"), TableView::from(v)))
312                .collect(),
313            types: state
314                .types
315                .iter()
316                .map(|((ks, tn), v)| (format!("{ks}/{tn}"), UserDefinedTypeView::from(v)))
317                .collect(),
318        }
319    }
320}
321
322impl StatefulService for KeyspacesService {
323    type StateView = KeyspacesStateView;
324
325    async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
326        let state = self.state.get(account_id, region);
327        let guard = state.read().await;
328        KeyspacesStateView::from(&*guard)
329    }
330
331    async fn restore(
332        &self,
333        account_id: &str,
334        region: &str,
335        view: Self::StateView,
336    ) -> Result<(), StateViewError> {
337        let state = self.state.get(account_id, region);
338        {
339            let mut guard = state.write().await;
340            *guard = KeyspacesState::from(view);
341        }
342        self.notify_state_changed(account_id, region).await;
343        Ok(())
344    }
345
346    async fn merge(
347        &self,
348        account_id: &str,
349        region: &str,
350        view: Self::StateView,
351    ) -> Result<(), StateViewError> {
352        let state = self.state.get(account_id, region);
353        {
354            let mut guard = state.write().await;
355            // Merge keyspaces.
356            for (name, ks_view) in view.keyspaces {
357                let ks = Keyspace {
358                    name: ks_view.name,
359                    arn: ks_view.arn,
360                    replication_strategy: ks_view.replication_strategy,
361                    replication_regions: ks_view.replication_regions,
362                    tags: ks_view.tags,
363                    creation_timestamp: ks_view
364                        .creation_timestamp
365                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
366                        .map(|dt| dt.with_timezone(&chrono::Utc))
367                        .unwrap_or_else(chrono::Utc::now),
368                    status: ks_view.status,
369                };
370                guard.keyspaces.insert(name, ks);
371            }
372            // Merge tables.
373            for tv in view.tables.into_values() {
374                let table = Table {
375                    keyspace_name: tv.keyspace_name.clone(),
376                    table_name: tv.table_name.clone(),
377                    arn: tv.arn,
378                    schema_definition: SchemaDefinition {
379                        all_columns: tv
380                            .schema_definition
381                            .all_columns
382                            .into_iter()
383                            .map(|c| ColumnDefinition {
384                                name: c.name,
385                                column_type: c.column_type,
386                            })
387                            .collect(),
388                        partition_keys: tv.schema_definition.partition_keys,
389                        clustering_keys: tv
390                            .schema_definition
391                            .clustering_keys
392                            .into_iter()
393                            .map(|c| ClusteringKey {
394                                name: c.name,
395                                order_by: c.order_by,
396                            })
397                            .collect(),
398                        static_columns: tv.schema_definition.static_columns,
399                    },
400                    capacity_mode: tv.capacity_mode,
401                    read_capacity_units: tv.read_capacity_units,
402                    write_capacity_units: tv.write_capacity_units,
403                    encryption_type: tv.encryption_type,
404                    kms_key_identifier: tv.kms_key_identifier,
405                    point_in_time_recovery_enabled: tv.point_in_time_recovery_enabled,
406                    ttl_status: tv.ttl_status,
407                    default_time_to_live: tv.default_time_to_live,
408                    comment: tv.comment,
409                    client_side_timestamps_enabled: tv.client_side_timestamps_enabled,
410                    tags: tv.tags,
411                    creation_timestamp: tv
412                        .creation_timestamp
413                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
414                        .map(|dt| dt.with_timezone(&chrono::Utc))
415                        .unwrap_or_else(chrono::Utc::now),
416                    status: tv.status,
417                };
418                guard.tables.insert(
419                    (table.keyspace_name.clone(), table.table_name.clone()),
420                    table,
421                );
422            }
423            // Merge types.
424            for tv in view.types.into_values() {
425                let udt = UserDefinedType {
426                    keyspace_name: tv.keyspace_name.clone(),
427                    type_name: tv.type_name.clone(),
428                    field_definitions: tv
429                        .field_definitions
430                        .into_iter()
431                        .map(|f| FieldDefinition {
432                            name: f.name,
433                            field_type: f.field_type,
434                        })
435                        .collect(),
436                    creation_timestamp: tv
437                        .creation_timestamp
438                        .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
439                        .map(|dt| dt.with_timezone(&chrono::Utc))
440                        .unwrap_or_else(chrono::Utc::now),
441                    status: tv.status,
442                };
443                guard
444                    .types
445                    .insert((udt.keyspace_name.clone(), udt.type_name.clone()), udt);
446            }
447        }
448        self.notify_state_changed(account_id, region).await;
449        Ok(())
450    }
451
452    fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
453        &self.notifier
454    }
455}