1use std::collections::HashMap;
4
5use serde::{Deserialize, Serialize};
6use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
7
8use crate::handlers::KeyspacesService;
9use crate::state::KeyspacesState;
10use crate::types::{
11 ClusteringKey, ColumnDefinition, FieldDefinition, Keyspace, SchemaDefinition, Table,
12 UserDefinedType,
13};
14
15#[derive(Debug, Clone, Default, Serialize, Deserialize)]
17pub struct KeyspacesStateView {
18 #[serde(default)]
19 pub keyspaces: HashMap<String, KeyspaceView>,
20 #[serde(default)]
22 pub tables: HashMap<String, TableView>,
23 #[serde(default)]
25 pub types: HashMap<String, UserDefinedTypeView>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct KeyspaceView {
30 pub name: String,
31 pub arn: String,
32 pub replication_strategy: String,
33 #[serde(default)]
34 pub replication_regions: Vec<String>,
35 #[serde(default)]
36 pub tags: HashMap<String, String>,
37 pub creation_timestamp: Option<String>,
38 #[serde(default)]
39 pub status: String,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TableView {
44 pub keyspace_name: String,
45 pub table_name: String,
46 pub arn: String,
47 pub schema_definition: SchemaDefinitionView,
48 pub capacity_mode: String,
49 pub read_capacity_units: Option<i64>,
50 pub write_capacity_units: Option<i64>,
51 pub encryption_type: String,
52 pub kms_key_identifier: Option<String>,
53 pub point_in_time_recovery_enabled: bool,
54 pub ttl_status: String,
55 pub default_time_to_live: Option<i32>,
56 #[serde(default)]
57 pub comment: String,
58 pub client_side_timestamps_enabled: bool,
59 #[serde(default)]
60 pub tags: HashMap<String, String>,
61 pub creation_timestamp: Option<String>,
62 #[serde(default)]
63 pub status: String,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct SchemaDefinitionView {
68 pub all_columns: Vec<ColumnDefinitionView>,
69 pub partition_keys: Vec<String>,
70 #[serde(default)]
71 pub clustering_keys: Vec<ClusteringKeyView>,
72 #[serde(default)]
73 pub static_columns: Vec<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ColumnDefinitionView {
78 pub name: String,
79 pub column_type: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ClusteringKeyView {
84 pub name: String,
85 pub order_by: String,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct UserDefinedTypeView {
90 pub keyspace_name: String,
91 pub type_name: String,
92 pub field_definitions: Vec<FieldDefinitionView>,
93 pub creation_timestamp: Option<String>,
94 #[serde(default)]
95 pub status: String,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct FieldDefinitionView {
100 pub name: String,
101 pub field_type: String,
102}
103
104impl From<&Keyspace> for KeyspaceView {
107 fn from(ks: &Keyspace) -> Self {
108 Self {
109 name: ks.name.clone(),
110 arn: ks.arn.clone(),
111 replication_strategy: ks.replication_strategy.clone(),
112 replication_regions: ks.replication_regions.clone(),
113 tags: ks.tags.clone(),
114 creation_timestamp: Some(ks.creation_timestamp.to_rfc3339()),
115 status: ks.status.clone(),
116 }
117 }
118}
119
120impl From<&Table> for TableView {
121 fn from(t: &Table) -> Self {
122 Self {
123 keyspace_name: t.keyspace_name.clone(),
124 table_name: t.table_name.clone(),
125 arn: t.arn.clone(),
126 schema_definition: SchemaDefinitionView::from(&t.schema_definition),
127 capacity_mode: t.capacity_mode.clone(),
128 read_capacity_units: t.read_capacity_units,
129 write_capacity_units: t.write_capacity_units,
130 encryption_type: t.encryption_type.clone(),
131 kms_key_identifier: t.kms_key_identifier.clone(),
132 point_in_time_recovery_enabled: t.point_in_time_recovery_enabled,
133 ttl_status: t.ttl_status.clone(),
134 default_time_to_live: t.default_time_to_live,
135 comment: t.comment.clone(),
136 client_side_timestamps_enabled: t.client_side_timestamps_enabled,
137 tags: t.tags.clone(),
138 creation_timestamp: Some(t.creation_timestamp.to_rfc3339()),
139 status: t.status.clone(),
140 }
141 }
142}
143
144impl From<&SchemaDefinition> for SchemaDefinitionView {
145 fn from(s: &SchemaDefinition) -> Self {
146 Self {
147 all_columns: s
148 .all_columns
149 .iter()
150 .map(|c| ColumnDefinitionView {
151 name: c.name.clone(),
152 column_type: c.column_type.clone(),
153 })
154 .collect(),
155 partition_keys: s.partition_keys.clone(),
156 clustering_keys: s
157 .clustering_keys
158 .iter()
159 .map(|c| ClusteringKeyView {
160 name: c.name.clone(),
161 order_by: c.order_by.clone(),
162 })
163 .collect(),
164 static_columns: s.static_columns.clone(),
165 }
166 }
167}
168
169impl From<&UserDefinedType> for UserDefinedTypeView {
170 fn from(t: &UserDefinedType) -> Self {
171 Self {
172 keyspace_name: t.keyspace_name.clone(),
173 type_name: t.type_name.clone(),
174 field_definitions: t
175 .field_definitions
176 .iter()
177 .map(|f| FieldDefinitionView {
178 name: f.name.clone(),
179 field_type: f.field_type.clone(),
180 })
181 .collect(),
182 creation_timestamp: Some(t.creation_timestamp.to_rfc3339()),
183 status: t.status.clone(),
184 }
185 }
186}
187
188impl From<KeyspacesStateView> for KeyspacesState {
189 fn from(view: KeyspacesStateView) -> Self {
190 let keyspaces = view
191 .keyspaces
192 .into_iter()
193 .map(|(k, v)| {
194 let ks = Keyspace {
195 name: v.name,
196 arn: v.arn,
197 replication_strategy: v.replication_strategy,
198 replication_regions: v.replication_regions,
199 tags: v.tags,
200 creation_timestamp: v
201 .creation_timestamp
202 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
203 .map(|dt| dt.with_timezone(&chrono::Utc))
204 .unwrap_or_else(chrono::Utc::now),
205 status: v.status,
206 };
207 (k, ks)
208 })
209 .collect();
210
211 let tables = view
212 .tables
213 .into_values()
214 .map(|v| {
215 let table = Table {
216 keyspace_name: v.keyspace_name.clone(),
217 table_name: v.table_name.clone(),
218 arn: v.arn,
219 schema_definition: SchemaDefinition {
220 all_columns: v
221 .schema_definition
222 .all_columns
223 .into_iter()
224 .map(|c| ColumnDefinition {
225 name: c.name,
226 column_type: c.column_type,
227 })
228 .collect(),
229 partition_keys: v.schema_definition.partition_keys,
230 clustering_keys: v
231 .schema_definition
232 .clustering_keys
233 .into_iter()
234 .map(|c| ClusteringKey {
235 name: c.name,
236 order_by: c.order_by,
237 })
238 .collect(),
239 static_columns: v.schema_definition.static_columns,
240 },
241 capacity_mode: v.capacity_mode,
242 read_capacity_units: v.read_capacity_units,
243 write_capacity_units: v.write_capacity_units,
244 encryption_type: v.encryption_type,
245 kms_key_identifier: v.kms_key_identifier,
246 point_in_time_recovery_enabled: v.point_in_time_recovery_enabled,
247 ttl_status: v.ttl_status,
248 default_time_to_live: v.default_time_to_live,
249 comment: v.comment,
250 client_side_timestamps_enabled: v.client_side_timestamps_enabled,
251 tags: v.tags,
252 creation_timestamp: v
253 .creation_timestamp
254 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
255 .map(|dt| dt.with_timezone(&chrono::Utc))
256 .unwrap_or_else(chrono::Utc::now),
257 status: v.status,
258 };
259 (
260 (table.keyspace_name.clone(), table.table_name.clone()),
261 table,
262 )
263 })
264 .collect();
265
266 let types = view
267 .types
268 .into_values()
269 .map(|v| {
270 let udt = UserDefinedType {
271 keyspace_name: v.keyspace_name.clone(),
272 type_name: v.type_name.clone(),
273 field_definitions: v
274 .field_definitions
275 .into_iter()
276 .map(|f| FieldDefinition {
277 name: f.name,
278 field_type: f.field_type,
279 })
280 .collect(),
281 creation_timestamp: v
282 .creation_timestamp
283 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
284 .map(|dt| dt.with_timezone(&chrono::Utc))
285 .unwrap_or_else(chrono::Utc::now),
286 status: v.status,
287 };
288 ((udt.keyspace_name.clone(), udt.type_name.clone()), udt)
289 })
290 .collect();
291
292 KeyspacesState {
293 keyspaces,
294 tables,
295 types,
296 }
297 }
298}
299
300impl From<&KeyspacesState> for KeyspacesStateView {
301 fn from(state: &KeyspacesState) -> Self {
302 Self {
303 keyspaces: state
304 .keyspaces
305 .iter()
306 .map(|(k, v)| (k.clone(), KeyspaceView::from(v)))
307 .collect(),
308 tables: state
309 .tables
310 .iter()
311 .map(|((ks, tn), v)| (format!("{ks}/{tn}"), TableView::from(v)))
312 .collect(),
313 types: state
314 .types
315 .iter()
316 .map(|((ks, tn), v)| (format!("{ks}/{tn}"), UserDefinedTypeView::from(v)))
317 .collect(),
318 }
319 }
320}
321
322impl StatefulService for KeyspacesService {
323 type StateView = KeyspacesStateView;
324
325 async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
326 let state = self.state.get(account_id, region);
327 let guard = state.read().await;
328 KeyspacesStateView::from(&*guard)
329 }
330
331 async fn restore(
332 &self,
333 account_id: &str,
334 region: &str,
335 view: Self::StateView,
336 ) -> Result<(), StateViewError> {
337 let state = self.state.get(account_id, region);
338 {
339 let mut guard = state.write().await;
340 *guard = KeyspacesState::from(view);
341 }
342 self.notify_state_changed(account_id, region).await;
343 Ok(())
344 }
345
346 async fn merge(
347 &self,
348 account_id: &str,
349 region: &str,
350 view: Self::StateView,
351 ) -> Result<(), StateViewError> {
352 let state = self.state.get(account_id, region);
353 {
354 let mut guard = state.write().await;
355 for (name, ks_view) in view.keyspaces {
357 let ks = Keyspace {
358 name: ks_view.name,
359 arn: ks_view.arn,
360 replication_strategy: ks_view.replication_strategy,
361 replication_regions: ks_view.replication_regions,
362 tags: ks_view.tags,
363 creation_timestamp: ks_view
364 .creation_timestamp
365 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
366 .map(|dt| dt.with_timezone(&chrono::Utc))
367 .unwrap_or_else(chrono::Utc::now),
368 status: ks_view.status,
369 };
370 guard.keyspaces.insert(name, ks);
371 }
372 for tv in view.tables.into_values() {
374 let table = Table {
375 keyspace_name: tv.keyspace_name.clone(),
376 table_name: tv.table_name.clone(),
377 arn: tv.arn,
378 schema_definition: SchemaDefinition {
379 all_columns: tv
380 .schema_definition
381 .all_columns
382 .into_iter()
383 .map(|c| ColumnDefinition {
384 name: c.name,
385 column_type: c.column_type,
386 })
387 .collect(),
388 partition_keys: tv.schema_definition.partition_keys,
389 clustering_keys: tv
390 .schema_definition
391 .clustering_keys
392 .into_iter()
393 .map(|c| ClusteringKey {
394 name: c.name,
395 order_by: c.order_by,
396 })
397 .collect(),
398 static_columns: tv.schema_definition.static_columns,
399 },
400 capacity_mode: tv.capacity_mode,
401 read_capacity_units: tv.read_capacity_units,
402 write_capacity_units: tv.write_capacity_units,
403 encryption_type: tv.encryption_type,
404 kms_key_identifier: tv.kms_key_identifier,
405 point_in_time_recovery_enabled: tv.point_in_time_recovery_enabled,
406 ttl_status: tv.ttl_status,
407 default_time_to_live: tv.default_time_to_live,
408 comment: tv.comment,
409 client_side_timestamps_enabled: tv.client_side_timestamps_enabled,
410 tags: tv.tags,
411 creation_timestamp: tv
412 .creation_timestamp
413 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
414 .map(|dt| dt.with_timezone(&chrono::Utc))
415 .unwrap_or_else(chrono::Utc::now),
416 status: tv.status,
417 };
418 guard.tables.insert(
419 (table.keyspace_name.clone(), table.table_name.clone()),
420 table,
421 );
422 }
423 for tv in view.types.into_values() {
425 let udt = UserDefinedType {
426 keyspace_name: tv.keyspace_name.clone(),
427 type_name: tv.type_name.clone(),
428 field_definitions: tv
429 .field_definitions
430 .into_iter()
431 .map(|f| FieldDefinition {
432 name: f.name,
433 field_type: f.field_type,
434 })
435 .collect(),
436 creation_timestamp: tv
437 .creation_timestamp
438 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
439 .map(|dt| dt.with_timezone(&chrono::Utc))
440 .unwrap_or_else(chrono::Utc::now),
441 status: tv.status,
442 };
443 guard
444 .types
445 .insert((udt.keyspace_name.clone(), udt.type_name.clone()), udt);
446 }
447 }
448 self.notify_state_changed(account_id, region).await;
449 Ok(())
450 }
451
452 fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
453 &self.notifier
454 }
455}