Skip to main content

winterbaume_redshiftdata/
views.rs

1//! Serde-compatible view types for Redshift Data state snapshots.
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
8
9use crate::handlers::RedshiftDataService;
10use crate::state::RedshiftDataState;
11use crate::types::StatementStatus;
12
13/// Serializable view of a statement parameter.
14#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15pub struct StatementParameterView {
16    pub name: String,
17    pub value: String,
18}
19
20/// Serializable view of a statement.
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct StatementView {
23    pub id: String,
24    pub sql: String,
25    pub cluster_identifier: Option<String>,
26    pub workgroup_name: Option<String>,
27    pub database: String,
28    pub db_user: Option<String>,
29    pub secret_arn: Option<String>,
30    pub status: String,
31    pub created_at: String,
32    pub updated_at: String,
33    pub result_rows: i64,
34    pub result_size: i64,
35    pub has_result_set: bool,
36    pub query_string: String,
37    #[serde(default)]
38    pub parameters: Vec<StatementParameterView>,
39    #[serde(default)]
40    pub result_columns: Vec<(String, String)>,
41    #[serde(default)]
42    pub result_data: Vec<Vec<Option<String>>>,
43    #[serde(default)]
44    pub error_message: Option<String>,
45}
46
47/// Serializable view of the entire Redshift Data state for one account/region.
48#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct RedshiftDataStateView {
50    #[serde(default)]
51    pub statements: HashMap<String, StatementView>,
52    #[serde(default)]
53    pub databases: Vec<String>,
54    #[serde(default)]
55    pub schemas: Vec<String>,
56    #[serde(default)]
57    pub table_names: Vec<String>,
58    #[serde(default)]
59    pub table_columns: HashMap<String, Vec<(String, String)>>,
60}
61
62// --- From internal types to view types ---
63
64fn dt_to_string(dt: &DateTime<Utc>) -> String {
65    dt.to_rfc3339()
66}
67
68fn parse_dt(s: &str) -> DateTime<Utc> {
69    DateTime::parse_from_rfc3339(s)
70        .map(|dt| dt.with_timezone(&Utc))
71        .unwrap_or_else(|_| Utc::now())
72}
73
74impl From<&crate::types::StatementParameter> for StatementParameterView {
75    fn from(p: &crate::types::StatementParameter) -> Self {
76        StatementParameterView {
77            name: p.name.clone(),
78            value: p.value.clone(),
79        }
80    }
81}
82
83impl From<&crate::types::Statement> for StatementView {
84    fn from(s: &crate::types::Statement) -> Self {
85        StatementView {
86            id: s.id.clone(),
87            sql: s.sql.clone(),
88            cluster_identifier: s.cluster_identifier.clone(),
89            workgroup_name: s.workgroup_name.clone(),
90            database: s.database.clone(),
91            db_user: s.db_user.clone(),
92            secret_arn: s.secret_arn.clone(),
93            status: s.status.as_str().to_string(),
94            created_at: dt_to_string(&s.created_at),
95            updated_at: dt_to_string(&s.updated_at),
96            result_rows: s.result_rows,
97            result_size: s.result_size,
98            has_result_set: s.has_result_set,
99            query_string: s.query_string.clone(),
100            parameters: s
101                .parameters
102                .iter()
103                .map(StatementParameterView::from)
104                .collect(),
105            result_columns: s.result_columns.clone(),
106            result_data: s.result_data.clone(),
107            error_message: s.error_message.clone(),
108        }
109    }
110}
111
112impl From<&RedshiftDataState> for RedshiftDataStateView {
113    fn from(state: &RedshiftDataState) -> Self {
114        RedshiftDataStateView {
115            statements: state
116                .statements
117                .iter()
118                .map(|(k, v)| (k.clone(), StatementView::from(v)))
119                .collect(),
120            databases: state.databases.clone(),
121            schemas: state.schemas.clone(),
122            table_names: state.table_names.clone(),
123            table_columns: state.table_columns.clone(),
124        }
125    }
126}
127
128// --- From view types to internal types ---
129
130fn parse_status(s: &str) -> StatementStatus {
131    match s {
132        "SUBMITTED" => StatementStatus::Submitted,
133        "STARTED" => StatementStatus::Started,
134        "FINISHED" => StatementStatus::Finished,
135        "FAILED" => StatementStatus::Failed,
136        "ABORTED" => StatementStatus::Aborted,
137        _ => StatementStatus::Finished,
138    }
139}
140
141impl From<StatementParameterView> for crate::types::StatementParameter {
142    fn from(v: StatementParameterView) -> Self {
143        crate::types::StatementParameter {
144            name: v.name,
145            value: v.value,
146        }
147    }
148}
149
150impl From<StatementView> for crate::types::Statement {
151    fn from(v: StatementView) -> Self {
152        crate::types::Statement {
153            id: v.id,
154            sql: v.sql,
155            cluster_identifier: v.cluster_identifier,
156            workgroup_name: v.workgroup_name,
157            database: v.database,
158            db_user: v.db_user,
159            secret_arn: v.secret_arn,
160            status: parse_status(&v.status),
161            created_at: parse_dt(&v.created_at),
162            updated_at: parse_dt(&v.updated_at),
163            result_rows: v.result_rows,
164            result_size: v.result_size,
165            has_result_set: v.has_result_set,
166            query_string: v.query_string,
167            parameters: v
168                .parameters
169                .into_iter()
170                .map(crate::types::StatementParameter::from)
171                .collect(),
172            sqls: vec![],
173            statement_name: None,
174            is_batch: false,
175            result_columns: v.result_columns,
176            result_data: v.result_data,
177            error_message: v.error_message,
178        }
179    }
180}
181
182impl From<RedshiftDataStateView> for RedshiftDataState {
183    fn from(view: RedshiftDataStateView) -> Self {
184        RedshiftDataState {
185            statements: view
186                .statements
187                .into_iter()
188                .map(|(k, v)| (k, crate::types::Statement::from(v)))
189                .collect(),
190            databases: view.databases,
191            schemas: view.schemas,
192            table_names: view.table_names,
193            table_columns: view.table_columns,
194        }
195    }
196}
197
198// --- StatefulService implementation ---
199
200impl StatefulService for RedshiftDataService {
201    type StateView = RedshiftDataStateView;
202
203    async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
204        let state = self.state.get(account_id, region);
205        let guard = state.read().await;
206        RedshiftDataStateView::from(&*guard)
207    }
208
209    async fn restore(
210        &self,
211        account_id: &str,
212        region: &str,
213        view: Self::StateView,
214    ) -> Result<(), StateViewError> {
215        let state = self.state.get(account_id, region);
216        {
217            let mut guard = state.write().await;
218            *guard = RedshiftDataState::from(view);
219        }
220        self.notify_state_changed(account_id, region).await;
221        Ok(())
222    }
223
224    async fn merge(
225        &self,
226        account_id: &str,
227        region: &str,
228        view: Self::StateView,
229    ) -> Result<(), StateViewError> {
230        let state = self.state.get(account_id, region);
231        {
232            let mut guard = state.write().await;
233            for (id, stmt_view) in view.statements {
234                guard
235                    .statements
236                    .insert(id, crate::types::Statement::from(stmt_view));
237            }
238            if !view.databases.is_empty() {
239                guard.databases = view.databases;
240            }
241            if !view.schemas.is_empty() {
242                guard.schemas = view.schemas;
243            }
244            if !view.table_names.is_empty() {
245                guard.table_names = view.table_names;
246            }
247            for (table_name, columns) in view.table_columns {
248                guard.table_columns.insert(table_name, columns);
249            }
250        }
251        self.notify_state_changed(account_id, region).await;
252        Ok(())
253    }
254
255    fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
256        &self.notifier
257    }
258}