winterbaume_redshiftdata/
views.rs1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use winterbaume_core::{StateChangeNotifier, StateViewError, StatefulService};
8
9use crate::handlers::RedshiftDataService;
10use crate::state::RedshiftDataState;
11use crate::types::StatementStatus;
12
13#[derive(Debug, Clone, Serialize, Deserialize, Default)]
15pub struct StatementParameterView {
16 pub name: String,
17 pub value: String,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct StatementView {
23 pub id: String,
24 pub sql: String,
25 pub cluster_identifier: Option<String>,
26 pub workgroup_name: Option<String>,
27 pub database: String,
28 pub db_user: Option<String>,
29 pub secret_arn: Option<String>,
30 pub status: String,
31 pub created_at: String,
32 pub updated_at: String,
33 pub result_rows: i64,
34 pub result_size: i64,
35 pub has_result_set: bool,
36 pub query_string: String,
37 #[serde(default)]
38 pub parameters: Vec<StatementParameterView>,
39 #[serde(default)]
40 pub result_columns: Vec<(String, String)>,
41 #[serde(default)]
42 pub result_data: Vec<Vec<Option<String>>>,
43 #[serde(default)]
44 pub error_message: Option<String>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct RedshiftDataStateView {
50 #[serde(default)]
51 pub statements: HashMap<String, StatementView>,
52 #[serde(default)]
53 pub databases: Vec<String>,
54 #[serde(default)]
55 pub schemas: Vec<String>,
56 #[serde(default)]
57 pub table_names: Vec<String>,
58 #[serde(default)]
59 pub table_columns: HashMap<String, Vec<(String, String)>>,
60}
61
62fn dt_to_string(dt: &DateTime<Utc>) -> String {
65 dt.to_rfc3339()
66}
67
68fn parse_dt(s: &str) -> DateTime<Utc> {
69 DateTime::parse_from_rfc3339(s)
70 .map(|dt| dt.with_timezone(&Utc))
71 .unwrap_or_else(|_| Utc::now())
72}
73
74impl From<&crate::types::StatementParameter> for StatementParameterView {
75 fn from(p: &crate::types::StatementParameter) -> Self {
76 StatementParameterView {
77 name: p.name.clone(),
78 value: p.value.clone(),
79 }
80 }
81}
82
83impl From<&crate::types::Statement> for StatementView {
84 fn from(s: &crate::types::Statement) -> Self {
85 StatementView {
86 id: s.id.clone(),
87 sql: s.sql.clone(),
88 cluster_identifier: s.cluster_identifier.clone(),
89 workgroup_name: s.workgroup_name.clone(),
90 database: s.database.clone(),
91 db_user: s.db_user.clone(),
92 secret_arn: s.secret_arn.clone(),
93 status: s.status.as_str().to_string(),
94 created_at: dt_to_string(&s.created_at),
95 updated_at: dt_to_string(&s.updated_at),
96 result_rows: s.result_rows,
97 result_size: s.result_size,
98 has_result_set: s.has_result_set,
99 query_string: s.query_string.clone(),
100 parameters: s
101 .parameters
102 .iter()
103 .map(StatementParameterView::from)
104 .collect(),
105 result_columns: s.result_columns.clone(),
106 result_data: s.result_data.clone(),
107 error_message: s.error_message.clone(),
108 }
109 }
110}
111
112impl From<&RedshiftDataState> for RedshiftDataStateView {
113 fn from(state: &RedshiftDataState) -> Self {
114 RedshiftDataStateView {
115 statements: state
116 .statements
117 .iter()
118 .map(|(k, v)| (k.clone(), StatementView::from(v)))
119 .collect(),
120 databases: state.databases.clone(),
121 schemas: state.schemas.clone(),
122 table_names: state.table_names.clone(),
123 table_columns: state.table_columns.clone(),
124 }
125 }
126}
127
128fn parse_status(s: &str) -> StatementStatus {
131 match s {
132 "SUBMITTED" => StatementStatus::Submitted,
133 "STARTED" => StatementStatus::Started,
134 "FINISHED" => StatementStatus::Finished,
135 "FAILED" => StatementStatus::Failed,
136 "ABORTED" => StatementStatus::Aborted,
137 _ => StatementStatus::Finished,
138 }
139}
140
141impl From<StatementParameterView> for crate::types::StatementParameter {
142 fn from(v: StatementParameterView) -> Self {
143 crate::types::StatementParameter {
144 name: v.name,
145 value: v.value,
146 }
147 }
148}
149
150impl From<StatementView> for crate::types::Statement {
151 fn from(v: StatementView) -> Self {
152 crate::types::Statement {
153 id: v.id,
154 sql: v.sql,
155 cluster_identifier: v.cluster_identifier,
156 workgroup_name: v.workgroup_name,
157 database: v.database,
158 db_user: v.db_user,
159 secret_arn: v.secret_arn,
160 status: parse_status(&v.status),
161 created_at: parse_dt(&v.created_at),
162 updated_at: parse_dt(&v.updated_at),
163 result_rows: v.result_rows,
164 result_size: v.result_size,
165 has_result_set: v.has_result_set,
166 query_string: v.query_string,
167 parameters: v
168 .parameters
169 .into_iter()
170 .map(crate::types::StatementParameter::from)
171 .collect(),
172 sqls: vec![],
173 statement_name: None,
174 is_batch: false,
175 result_columns: v.result_columns,
176 result_data: v.result_data,
177 error_message: v.error_message,
178 }
179 }
180}
181
182impl From<RedshiftDataStateView> for RedshiftDataState {
183 fn from(view: RedshiftDataStateView) -> Self {
184 RedshiftDataState {
185 statements: view
186 .statements
187 .into_iter()
188 .map(|(k, v)| (k, crate::types::Statement::from(v)))
189 .collect(),
190 databases: view.databases,
191 schemas: view.schemas,
192 table_names: view.table_names,
193 table_columns: view.table_columns,
194 }
195 }
196}
197
198impl StatefulService for RedshiftDataService {
201 type StateView = RedshiftDataStateView;
202
203 async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView {
204 let state = self.state.get(account_id, region);
205 let guard = state.read().await;
206 RedshiftDataStateView::from(&*guard)
207 }
208
209 async fn restore(
210 &self,
211 account_id: &str,
212 region: &str,
213 view: Self::StateView,
214 ) -> Result<(), StateViewError> {
215 let state = self.state.get(account_id, region);
216 {
217 let mut guard = state.write().await;
218 *guard = RedshiftDataState::from(view);
219 }
220 self.notify_state_changed(account_id, region).await;
221 Ok(())
222 }
223
224 async fn merge(
225 &self,
226 account_id: &str,
227 region: &str,
228 view: Self::StateView,
229 ) -> Result<(), StateViewError> {
230 let state = self.state.get(account_id, region);
231 {
232 let mut guard = state.write().await;
233 for (id, stmt_view) in view.statements {
234 guard
235 .statements
236 .insert(id, crate::types::Statement::from(stmt_view));
237 }
238 if !view.databases.is_empty() {
239 guard.databases = view.databases;
240 }
241 if !view.schemas.is_empty() {
242 guard.schemas = view.schemas;
243 }
244 if !view.table_names.is_empty() {
245 guard.table_names = view.table_names;
246 }
247 for (table_name, columns) in view.table_columns {
248 guard.table_columns.insert(table_name, columns);
249 }
250 }
251 self.notify_state_changed(account_id, region).await;
252 Ok(())
253 }
254
255 fn notifier(&self) -> &StateChangeNotifier<Self::StateView> {
256 &self.notifier
257 }
258}