1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7use http::StatusCode;
8use serde_json::{json, Value};
9use tokio::sync::Mutex as AsyncMutex;
10
11use fakecloud_cloudformation::CloudFormationService;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::SnapshotStore;
14
15use crate::patch::apply_json_patch;
16use crate::persistence::save_snapshot;
17use crate::state::{CloudControlState, ManagedResource, ResourceRequest, SharedCloudControlState};
18
19pub const CLOUDCONTROL_ACTIONS: &[&str] = &[
21 "CreateResource",
22 "GetResource",
23 "UpdateResource",
24 "DeleteResource",
25 "ListResources",
26 "GetResourceRequestStatus",
27 "ListResourceRequests",
28 "CancelResourceRequest",
29];
30
31pub struct CloudControlService {
32 cfn: Arc<CloudFormationService>,
33 state: SharedCloudControlState,
34 snapshot_store: Option<Arc<dyn SnapshotStore>>,
35 snapshot_lock: Arc<AsyncMutex<()>>,
36}
37
38impl CloudControlService {
39 pub fn new(cfn: Arc<CloudFormationService>, state: SharedCloudControlState) -> Self {
40 Self {
41 cfn,
42 state,
43 snapshot_store: None,
44 snapshot_lock: Arc::new(AsyncMutex::new(())),
45 }
46 }
47
48 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
49 self.snapshot_store = Some(store);
50 self
51 }
52
53 async fn persist(&self) {
54 save_snapshot(
55 &self.state,
56 self.snapshot_store.clone(),
57 &self.snapshot_lock,
58 )
59 .await;
60 }
61
62 async fn create_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
65 let body = parse_json(&req.body)?;
66 validate_type_name(&body)?;
69 validate_len(&body, "DesiredState", 1, 262144)?;
70 validate_len(&body, "ClientToken", 1, 128)?;
71 validate_len(&body, "RoleArn", 20, 2048)?;
72 let type_name = require_str(&body, "TypeName")?;
73 let desired = require_str(&body, "DesiredState")?;
74 let desired_state: Value = serde_json::from_str(desired)
75 .map_err(|e| invalid_request(&format!("DesiredState is not valid JSON: {e}")))?;
76 let client_token = opt_str(&body, "ClientToken");
77 let fingerprint =
78 fingerprint_of("CREATE", type_name, None, Some(&desired_state.to_string()));
79
80 if let Some(token) = &client_token {
83 if let Some(resp) =
84 self.client_token_replay_or_conflict(&req.account_id, token, &fingerprint)
85 {
86 return resp;
87 }
88 }
89
90 let request_token = new_token();
91 let outcome = self.cfn.cloudcontrol_create(
92 type_name,
93 desired_state.clone(),
94 &req.account_id,
95 &req.region,
96 );
97
98 let record = match outcome {
99 Ok(res) => {
100 let managed = ManagedResource {
101 type_name: type_name.to_string(),
102 identifier: res.physical_id.clone(),
103 properties: desired_state.clone(),
104 attributes: res.attributes.clone(),
105 created_at: Utc::now(),
106 };
107 let mut accounts = self.state.write();
108 let st = accounts.get_or_create(&req.account_id);
109 st.resources.insert(
110 CloudControlState::resource_key(type_name, &res.physical_id),
111 managed,
112 );
113 success_request(
114 &request_token,
115 type_name,
116 Some(res.physical_id),
117 "CREATE",
118 Some(desired_state),
119 client_token,
120 Some(fingerprint),
121 )
122 }
123 Err(msg) => failed_request(
124 &request_token,
125 type_name,
126 None,
127 "CREATE",
128 &msg,
129 client_token,
130 Some(fingerprint),
131 ),
132 };
133
134 self.store_request(&req.account_id, record.clone());
135 if record.operation_status == "SUCCESS" {
136 self.cfn.cloudcontrol_persist_type(type_name).await;
139 }
140 self.persist().await;
141 Ok(progress_event_response(&record))
142 }
143
144 fn get_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
147 let body = parse_json(&req.body)?;
148 validate_type_name(&body)?;
149 validate_len(&body, "Identifier", 1, 1024)?;
150 let type_name = require_str(&body, "TypeName")?;
151 let identifier = require_str(&body, "Identifier")?;
152 let accounts = self.state.read();
153 let managed = accounts
154 .get(&req.account_id)
155 .and_then(|s| {
156 s.resources
157 .get(&CloudControlState::resource_key(type_name, identifier))
158 })
159 .ok_or_else(|| resource_not_found(type_name, identifier))?;
160 Ok(AwsResponse::json_value(
161 StatusCode::OK,
162 json!({
163 "TypeName": type_name,
164 "ResourceDescription": resource_description(managed),
165 }),
166 ))
167 }
168
169 async fn update_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
172 let body = parse_json(&req.body)?;
173 validate_type_name(&body)?;
174 validate_len(&body, "Identifier", 1, 1024)?;
175 validate_len(&body, "PatchDocument", 1, 262144)?;
176 validate_len(&body, "ClientToken", 1, 128)?;
177 validate_len(&body, "RoleArn", 20, 2048)?;
178 let type_name = require_str(&body, "TypeName")?;
179 let identifier = require_str(&body, "Identifier")?;
180 let patch_str = require_str(&body, "PatchDocument")?;
181 let patch: Value = serde_json::from_str(patch_str)
182 .map_err(|e| invalid_request(&format!("PatchDocument is not valid JSON: {e}")))?;
183 let client_token = opt_str(&body, "ClientToken");
184 let fingerprint = fingerprint_of("UPDATE", type_name, Some(identifier), Some(patch_str));
185
186 if let Some(token) = &client_token {
187 if let Some(resp) =
188 self.client_token_replay_or_conflict(&req.account_id, token, &fingerprint)
189 {
190 return resp;
191 }
192 }
193
194 let key = CloudControlState::resource_key(type_name, identifier);
196 let (mut properties, attributes) = {
197 let accounts = self.state.read();
198 let managed = accounts
199 .get(&req.account_id)
200 .and_then(|s| s.resources.get(&key))
201 .ok_or_else(|| resource_not_found(type_name, identifier))?;
202 (managed.properties.clone(), managed.attributes.clone())
203 };
204
205 apply_json_patch(&mut properties, &patch)
206 .map_err(|e| invalid_request(&format!("invalid PatchDocument: {e}")))?;
207
208 let request_token = new_token();
209 let outcome = self.cfn.cloudcontrol_update(
210 type_name,
211 identifier,
212 &attributes,
213 properties.clone(),
214 &req.account_id,
215 &req.region,
216 );
217
218 let record = match outcome {
219 Ok(res) => {
220 let new_id = res.physical_id.clone();
221 let mut accounts = self.state.write();
222 let st = accounts.get_or_create(&req.account_id);
223 if new_id != identifier {
224 let mut managed = st.resources.remove(&key).unwrap_or(ManagedResource {
228 type_name: type_name.to_string(),
229 identifier: new_id.clone(),
230 properties: properties.clone(),
231 attributes: res.attributes.clone(),
232 created_at: Utc::now(),
233 });
234 managed.identifier = new_id.clone();
235 managed.properties = properties.clone();
236 managed.attributes = res.attributes.clone();
237 st.resources
238 .insert(CloudControlState::resource_key(type_name, &new_id), managed);
239 } else if let Some(managed) = st.resources.get_mut(&key) {
240 managed.properties = properties.clone();
241 managed.attributes = res.attributes.clone();
242 }
243 success_request(
244 &request_token,
245 type_name,
246 Some(new_id),
247 "UPDATE",
248 Some(properties),
249 client_token,
250 Some(fingerprint),
251 )
252 }
253 Err(msg) => failed_request(
254 &request_token,
255 type_name,
256 Some(identifier.to_string()),
257 "UPDATE",
258 &msg,
259 client_token,
260 Some(fingerprint),
261 ),
262 };
263
264 self.store_request(&req.account_id, record.clone());
265 if record.operation_status == "SUCCESS" {
266 self.cfn.cloudcontrol_persist_type(type_name).await;
267 }
268 self.persist().await;
269 Ok(progress_event_response(&record))
270 }
271
272 async fn delete_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
275 let body = parse_json(&req.body)?;
276 validate_type_name(&body)?;
277 validate_len(&body, "Identifier", 1, 1024)?;
278 validate_len(&body, "ClientToken", 1, 128)?;
279 validate_len(&body, "RoleArn", 20, 2048)?;
280 let type_name = require_str(&body, "TypeName")?;
281 let identifier = require_str(&body, "Identifier")?;
282 let client_token = opt_str(&body, "ClientToken");
283 let fingerprint = fingerprint_of("DELETE", type_name, Some(identifier), None);
284
285 if let Some(token) = &client_token {
286 if let Some(resp) =
287 self.client_token_replay_or_conflict(&req.account_id, token, &fingerprint)
288 {
289 return resp;
290 }
291 }
292
293 let key = CloudControlState::resource_key(type_name, identifier);
294 let attributes = {
295 let accounts = self.state.read();
296 let managed = accounts
297 .get(&req.account_id)
298 .and_then(|s| s.resources.get(&key))
299 .ok_or_else(|| resource_not_found(type_name, identifier))?;
300 managed.attributes.clone()
301 };
302
303 let request_token = new_token();
304 let outcome = self.cfn.cloudcontrol_delete(
305 type_name,
306 identifier,
307 &attributes,
308 &req.account_id,
309 &req.region,
310 );
311
312 let record = match outcome {
313 Ok(()) => {
314 let mut accounts = self.state.write();
315 let st = accounts.get_or_create(&req.account_id);
316 st.resources.remove(&key);
317 success_request(
318 &request_token,
319 type_name,
320 Some(identifier.to_string()),
321 "DELETE",
322 None,
323 client_token,
324 Some(fingerprint),
325 )
326 }
327 Err(msg) => failed_request(
328 &request_token,
329 type_name,
330 Some(identifier.to_string()),
331 "DELETE",
332 &msg,
333 client_token,
334 Some(fingerprint),
335 ),
336 };
337
338 self.store_request(&req.account_id, record.clone());
339 if record.operation_status == "SUCCESS" {
340 self.cfn.cloudcontrol_persist_type(type_name).await;
341 }
342 self.persist().await;
343 Ok(progress_event_response(&record))
344 }
345
346 fn list_resources(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
349 let body = parse_json(&req.body)?;
350 validate_type_name(&body)?;
352 validate_len(&body, "TypeVersionId", 1, 128)?;
353 validate_len(&body, "RoleArn", 20, 2048)?;
354 validate_len(&body, "NextToken", 1, 4096)?;
355 validate_len(&body, "ResourceModel", 1, 262144)?;
356 validate_max_results(&body)?;
357 let type_name = require_str(&body, "TypeName")?;
358 let all: Vec<Value> = {
359 let accounts = self.state.read();
360 accounts
361 .get(&req.account_id)
362 .map(|s| {
363 s.resources
364 .values()
365 .filter(|m| m.type_name == type_name)
366 .map(resource_description)
367 .collect()
368 })
369 .unwrap_or_default()
370 };
371 let (page, next) = paginate(all, &body);
372 let mut resp = json!({ "TypeName": type_name, "ResourceDescriptions": page });
373 if let Some(nt) = next {
374 resp["NextToken"] = json!(nt);
375 }
376 Ok(AwsResponse::json_value(StatusCode::OK, resp))
377 }
378
379 fn get_request_status(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
382 let body = parse_json(&req.body)?;
383 let token = require_str(&body, "RequestToken")?;
384 let accounts = self.state.read();
385 let record = accounts
386 .get(&req.account_id)
387 .and_then(|s| s.requests.get(token))
388 .ok_or_else(|| request_token_not_found(token))?;
389 Ok(progress_event_response(record))
390 }
391
392 fn list_requests(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
393 let body = parse_json(&req.body)?;
394 validate_len(&body, "NextToken", 1, 2048)?;
395 validate_max_results(&body)?;
396 let filter_statuses: Vec<String> = body
397 .get("ResourceRequestStatusFilter")
398 .and_then(|f| f.get("OperationStatuses"))
399 .and_then(|v| v.as_array())
400 .map(|a| {
401 a.iter()
402 .filter_map(|s| s.as_str().map(String::from))
403 .collect()
404 })
405 .unwrap_or_default();
406 let filter_ops: Vec<String> = body
407 .get("ResourceRequestStatusFilter")
408 .and_then(|f| f.get("Operations"))
409 .and_then(|v| v.as_array())
410 .map(|a| {
411 a.iter()
412 .filter_map(|s| s.as_str().map(String::from))
413 .collect()
414 })
415 .unwrap_or_default();
416 let all: Vec<Value> = {
417 let accounts = self.state.read();
418 accounts
419 .get(&req.account_id)
420 .map(|s| {
421 s.requests
422 .values()
423 .filter(|r| {
424 filter_statuses.is_empty()
425 || filter_statuses.contains(&r.operation_status)
426 })
427 .filter(|r| filter_ops.is_empty() || filter_ops.contains(&r.operation))
428 .map(progress_event_json)
429 .collect()
430 })
431 .unwrap_or_default()
432 };
433 let (page, next) = paginate(all, &body);
434 let mut resp = json!({ "ResourceRequestStatusSummaries": page });
435 if let Some(nt) = next {
436 resp["NextToken"] = json!(nt);
437 }
438 Ok(AwsResponse::json_value(StatusCode::OK, resp))
439 }
440
441 async fn cancel_request(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
442 let body = parse_json(&req.body)?;
443 let token = require_str(&body, "RequestToken")?;
444 let (record, mutated) = {
451 let mut accounts = self.state.write();
452 let st = accounts.get_or_create(&req.account_id);
453 let r = st
454 .requests
455 .get_mut(token)
456 .ok_or_else(|| request_token_not_found(token))?;
457 let mutated = if !is_terminal(&r.operation_status) {
458 r.operation_status = "CANCEL_COMPLETE".to_string();
459 true
460 } else {
461 false
462 };
463 (r.clone(), mutated)
464 };
465 if mutated {
466 self.persist().await;
467 }
468 Ok(progress_event_response(&record))
469 }
470
471 fn store_request(&self, account_id: &str, record: ResourceRequest) {
474 let mut accounts = self.state.write();
475 let st = accounts.get_or_create(account_id);
476 st.requests.insert(record.request_token.clone(), record);
477 }
478
479 fn find_by_client_token(&self, account_id: &str, token: &str) -> Option<ResourceRequest> {
480 let accounts = self.state.read();
481 accounts.get(account_id).and_then(|s| {
482 s.requests
483 .values()
484 .find(|r| r.client_token.as_deref() == Some(token))
485 .cloned()
486 })
487 }
488
489 fn client_token_replay_or_conflict(
493 &self,
494 account_id: &str,
495 token: &str,
496 fingerprint: &str,
497 ) -> Option<Result<AwsResponse, AwsServiceError>> {
498 let existing = self.find_by_client_token(account_id, token)?;
499 match existing.fingerprint.as_deref() {
500 None => Some(Ok(progress_event_response(&existing))),
503 Some(fp) if fp == fingerprint => Some(Ok(progress_event_response(&existing))),
504 Some(_) => Some(Err(client_token_conflict(token))),
505 }
506 }
507}
508
509#[async_trait]
510impl AwsService for CloudControlService {
511 fn service_name(&self) -> &str {
512 "cloudcontrolapi"
513 }
514
515 fn supported_actions(&self) -> &[&str] {
516 CLOUDCONTROL_ACTIONS
517 }
518
519 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
520 match req.action.as_str() {
521 "CreateResource" => self.create_resource(&req).await,
522 "GetResource" => self.get_resource(&req),
523 "UpdateResource" => self.update_resource(&req).await,
524 "DeleteResource" => self.delete_resource(&req).await,
525 "ListResources" => self.list_resources(&req),
526 "GetResourceRequestStatus" => self.get_request_status(&req),
527 "ListResourceRequests" => self.list_requests(&req),
528 "CancelResourceRequest" => self.cancel_request(&req).await,
529 other => Err(AwsServiceError::aws_error(
530 StatusCode::BAD_REQUEST,
531 "InvalidRequestException",
532 format!("Unknown operation: {other}"),
533 )),
534 }
535 }
536}
537
538#[allow(clippy::too_many_arguments)]
543fn success_request(
544 token: &str,
545 type_name: &str,
546 identifier: Option<String>,
547 operation: &str,
548 resource_model: Option<Value>,
549 client_token: Option<String>,
550 fingerprint: Option<String>,
551) -> ResourceRequest {
552 ResourceRequest {
553 request_token: token.to_string(),
554 type_name: type_name.to_string(),
555 identifier,
556 operation: operation.to_string(),
557 operation_status: "SUCCESS".to_string(),
558 event_time: Utc::now(),
559 resource_model,
560 status_message: None,
561 error_code: None,
562 client_token,
563 fingerprint,
564 }
565}
566
567fn failed_request(
568 token: &str,
569 type_name: &str,
570 identifier: Option<String>,
571 operation: &str,
572 message: &str,
573 client_token: Option<String>,
574 fingerprint: Option<String>,
575) -> ResourceRequest {
576 ResourceRequest {
577 request_token: token.to_string(),
578 type_name: type_name.to_string(),
579 identifier,
580 operation: operation.to_string(),
581 operation_status: "FAILED".to_string(),
582 event_time: Utc::now(),
583 resource_model: None,
584 status_message: Some(message.to_string()),
585 error_code: Some("GeneralServiceException".to_string()),
586 client_token,
587 fingerprint,
588 }
589}
590
591fn fingerprint_of(
594 operation: &str,
595 type_name: &str,
596 identifier: Option<&str>,
597 payload: Option<&str>,
598) -> String {
599 format!(
600 "{operation}\u{1f}{type_name}\u{1f}{}\u{1f}{}",
601 identifier.unwrap_or(""),
602 payload.unwrap_or(""),
603 )
604}
605
606fn progress_event_json(record: &ResourceRequest) -> Value {
607 let mut ev = json!({
608 "TypeName": record.type_name,
609 "RequestToken": record.request_token,
610 "Operation": record.operation,
611 "OperationStatus": record.operation_status,
612 "EventTime": record.event_time.timestamp() as f64
613 + record.event_time.timestamp_subsec_millis() as f64 / 1000.0,
614 });
615 if let Some(id) = &record.identifier {
616 ev["Identifier"] = json!(id);
617 }
618 if let Some(model) = &record.resource_model {
619 ev["ResourceModel"] = json!(model.to_string());
621 }
622 if let Some(msg) = &record.status_message {
623 ev["StatusMessage"] = json!(msg);
624 }
625 if let Some(code) = &record.error_code {
626 ev["ErrorCode"] = json!(code);
627 }
628 ev
629}
630
631fn progress_event_response(record: &ResourceRequest) -> AwsResponse {
632 AwsResponse::json_value(
633 StatusCode::OK,
634 json!({ "ProgressEvent": progress_event_json(record) }),
635 )
636}
637
638fn resource_description(managed: &ManagedResource) -> Value {
639 json!({
640 "Identifier": managed.identifier,
641 "Properties": managed.properties.to_string(),
643 })
644}
645
646fn is_terminal(status: &str) -> bool {
647 matches!(status, "SUCCESS" | "FAILED" | "CANCEL_COMPLETE")
648}
649
650fn parse_json(body: &[u8]) -> Result<Value, AwsServiceError> {
655 if body.is_empty() {
656 return Ok(json!({}));
657 }
658 serde_json::from_slice(body).map_err(|e| invalid_request(&format!("invalid request body: {e}")))
659}
660
661fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
662 body.get(field)
663 .and_then(|v| v.as_str())
664 .filter(|s| !s.is_empty())
665 .ok_or_else(|| invalid_request(&format!("{field} is required.")))
666}
667
668fn opt_str(body: &Value, field: &str) -> Option<String> {
669 body.get(field).and_then(|v| v.as_str()).map(String::from)
670}
671
672fn new_token() -> String {
673 uuid::Uuid::new_v4().to_string()
674}
675
676fn validate_len(body: &Value, field: &str, min: usize, max: usize) -> Result<(), AwsServiceError> {
678 if let Some(s) = body.get(field).and_then(|v| v.as_str()) {
679 if s.len() < min || s.len() > max {
680 return Err(invalid_request(&format!(
681 "Value at '{field}' failed to satisfy constraint: Member must have length between {min} and {max}."
682 )));
683 }
684 }
685 Ok(())
686}
687
688fn validate_type_name(body: &Value) -> Result<(), AwsServiceError> {
691 let name = require_str(body, "TypeName")?;
692 if name.len() < 10 || name.len() > 196 || !is_valid_type_name(name) {
693 return Err(invalid_request(
694 "Value at 'TypeName' failed to satisfy constraint: Member must match the resource type name pattern (e.g. AWS::S3::Bucket).",
695 ));
696 }
697 Ok(())
698}
699
700fn is_valid_type_name(name: &str) -> bool {
702 let parts: Vec<&str> = name.split("::").collect();
703 parts.len() == 3
704 && parts
705 .iter()
706 .all(|p| (2..=64).contains(&p.len()) && p.chars().all(|c| c.is_ascii_alphanumeric()))
707}
708
709fn validate_max_results(body: &Value) -> Result<(), AwsServiceError> {
711 if let Some(v) = body.get("MaxResults") {
712 let n = v
713 .as_i64()
714 .ok_or_else(|| invalid_request("MaxResults must be an integer."))?;
715 if !(1..=100).contains(&n) {
716 return Err(invalid_request("MaxResults must be between 1 and 100."));
717 }
718 }
719 Ok(())
720}
721
722fn paginate(items: Vec<Value>, body: &Value) -> (Vec<Value>, Option<String>) {
730 let max = body
731 .get("MaxResults")
732 .and_then(|v| v.as_i64())
733 .map(|n| n as usize)
734 .unwrap_or(100);
735 let start = match body.get("NextToken").and_then(|v| v.as_str()) {
736 Some(t) => match t.parse::<usize>() {
740 Ok(n) => n,
741 Err(_) => return (Vec::new(), None),
742 },
743 None => 0,
744 };
745 let total = items.len();
746 if start >= total {
747 return (Vec::new(), None);
748 }
749 let end = start.saturating_add(max).min(total);
750 let next = if end < total {
751 Some(end.to_string())
752 } else {
753 None
754 };
755 (items[start..end].to_vec(), next)
756}
757
758fn invalid_request(msg: &str) -> AwsServiceError {
759 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidRequestException", msg)
760}
761
762fn resource_not_found(type_name: &str, identifier: &str) -> AwsServiceError {
763 AwsServiceError::aws_error(
764 StatusCode::NOT_FOUND,
765 "ResourceNotFoundException",
766 format!("Resource of type '{type_name}' with identifier '{identifier}' was not found."),
767 )
768}
769
770fn client_token_conflict(token: &str) -> AwsServiceError {
771 AwsServiceError::aws_error(
772 StatusCode::BAD_REQUEST,
773 "ClientTokenConflictException",
774 format!("The client token '{token}' is already in use with different request parameters."),
775 )
776}
777
778fn request_token_not_found(token: &str) -> AwsServiceError {
779 AwsServiceError::aws_error(
780 StatusCode::NOT_FOUND,
781 "RequestTokenNotFoundException",
782 format!("Request token '{token}' was not found."),
783 )
784}