1use chrono::Utc;
4use http::header::{ETAG, LOCATION};
5use http::{HeaderMap, HeaderValue, StatusCode};
6use uuid::Uuid;
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use crate::policies::{
11 not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
12};
13use crate::router::Route;
14use crate::service::{
15 aws_error, esc, generate_etag, invalid_argument, xml_response, CloudFrontService,
16 DEFAULT_ACCOUNT,
17};
18use crate::state::Tag;
19use crate::streaming::{
20 StoredStreamingDistribution, StreamingDistributionConfig, StreamingDistributionConfigWithTags,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27impl CloudFrontService {
28 pub(crate) fn create_streaming_distribution(
29 &self,
30 req: &AwsRequest,
31 with_tags: bool,
32 ) -> Result<AwsResponse, AwsServiceError> {
33 let (config, tags) = if with_tags {
34 let parsed: StreamingDistributionConfigWithTags = xml_io::from_xml_root(&req.body)
35 .map_err(|e| {
36 invalid_argument(format!(
37 "invalid StreamingDistributionConfigWithTags XML: {e}"
38 ))
39 })?;
40 let tags = parsed
41 .tags
42 .items
43 .map(|i| {
44 i.tag
45 .into_iter()
46 .map(|t| Tag {
47 key: t.key,
48 value: t.value,
49 })
50 .collect()
51 })
52 .unwrap_or_default();
53 (parsed.streaming_distribution_config, tags)
54 } else {
55 let parsed: StreamingDistributionConfig =
56 xml_io::from_xml_root(&req.body).map_err(|e| {
57 invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
58 })?;
59 (parsed, Vec::new())
60 };
61 if config.caller_reference.is_empty() {
62 return Err(invalid_argument("CallerReference is required"));
63 }
64
65 let mut state = self.state.write();
66 let account = state
67 .accounts
68 .entry(DEFAULT_ACCOUNT.to_string())
69 .or_default();
70
71 if let Some(existing) = account
72 .streaming_distributions
73 .values()
74 .find(|d| d.config.caller_reference == config.caller_reference)
75 {
76 return Err(aws_error(
77 StatusCode::CONFLICT,
78 "StreamingDistributionAlreadyExists",
79 format!(
80 "Streaming distribution with same CallerReference exists: {}",
81 existing.id
82 ),
83 ));
84 }
85
86 let id = generate_streaming_id();
87 let now = Utc::now();
88 let etag = generate_etag();
89 let domain = format!("{}.cloudfront.net", id.to_lowercase());
90 let arn = format!(
91 "arn:aws:cloudfront::{}:streaming-distribution/{}",
92 DEFAULT_ACCOUNT, id
93 );
94
95 let stored = StoredStreamingDistribution {
96 id: id.clone(),
97 arn: arn.clone(),
98 status: "InProgress".to_string(),
102 last_modified_time: now,
103 domain_name: domain,
104 etag: etag.clone(),
105 config,
106 };
107 account
108 .streaming_distributions
109 .insert(id.clone(), stored.clone());
110 if !tags.is_empty() {
111 account.tags.insert(arn.clone(), tags);
112 }
113 drop(state);
114
115 self.schedule_streaming_distribution_deploy(id.clone());
116
117 let body = render_streaming_distribution(&stored);
118 let mut headers = HeaderMap::new();
119 if let Ok(v) = HeaderValue::from_str(&etag) {
120 headers.insert(ETAG, v);
121 }
122 if let Ok(v) = HeaderValue::from_str(&stored.arn) {
123 headers.insert(LOCATION, v);
124 }
125 Ok(xml_response(StatusCode::CREATED, body, headers))
126 }
127
128 pub(crate) fn get_streaming_distribution(
129 &self,
130 route: &Route,
131 ) -> Result<AwsResponse, AwsServiceError> {
132 let id = route_id(route, "StreamingDistribution")?;
133 let state = self.state.read();
134 let d = state
135 .accounts
136 .get(DEFAULT_ACCOUNT)
137 .and_then(|a| a.streaming_distributions.get(&id).cloned())
138 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
139 drop(state);
140 let body = render_streaming_distribution(&d);
141 Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
142 }
143
144 pub(crate) fn get_streaming_distribution_config(
145 &self,
146 route: &Route,
147 ) -> Result<AwsResponse, AwsServiceError> {
148 let id = route_id(route, "StreamingDistribution")?;
149 let state = self.state.read();
150 let d = state
151 .accounts
152 .get(DEFAULT_ACCOUNT)
153 .and_then(|a| a.streaming_distributions.get(&id).cloned())
154 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
155 drop(state);
156 let body = render_streaming_distribution_config(&d.config);
157 Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
158 }
159
160 pub(crate) fn update_streaming_distribution(
161 &self,
162 req: &AwsRequest,
163 route: &Route,
164 ) -> Result<AwsResponse, AwsServiceError> {
165 let id = route_id(route, "StreamingDistribution")?;
166 let if_match = require_if_match(req)?;
167 let cfg: StreamingDistributionConfig = xml_io::from_xml_root(&req.body).map_err(|e| {
168 invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
169 })?;
170 if cfg.caller_reference.is_empty() {
171 return Err(invalid_argument("CallerReference is required"));
172 }
173
174 let mut state = self.state.write();
175 let account = state
176 .accounts
177 .get_mut(DEFAULT_ACCOUNT)
178 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
179 let d = account
180 .streaming_distributions
181 .get_mut(&id)
182 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
183 if d.etag != if_match {
184 return Err(precondition_failed());
185 }
186 if d.config.caller_reference != cfg.caller_reference {
187 return Err(invalid_argument(
188 "CallerReference cannot change on UpdateStreamingDistribution",
189 ));
190 }
191 let config_changed = !streaming_configs_equal(&d.config, &cfg);
195 if config_changed {
196 d.config = cfg;
197 d.etag = generate_etag();
198 d.last_modified_time = Utc::now();
199 d.status = "InProgress".to_string();
203 }
204 let snap = d.clone();
205 drop(state);
206
207 if config_changed {
208 self.schedule_streaming_distribution_deploy(id.clone());
209 }
210
211 let body = render_streaming_distribution(&snap);
212 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
213 }
214
215 pub(crate) fn delete_streaming_distribution(
216 &self,
217 req: &AwsRequest,
218 route: &Route,
219 ) -> Result<AwsResponse, AwsServiceError> {
220 let id = route_id(route, "StreamingDistribution")?;
221 let if_match = require_if_match(req)?;
222 let mut state = self.state.write();
223 let account = state
224 .accounts
225 .get_mut(DEFAULT_ACCOUNT)
226 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
227 let d = account
228 .streaming_distributions
229 .get(&id)
230 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
231 if d.etag != if_match {
232 return Err(precondition_failed());
233 }
234 if d.config.enabled {
235 return Err(aws_error(
236 StatusCode::PRECONDITION_FAILED,
237 "StreamingDistributionNotDisabled",
238 "StreamingDistribution must be disabled before deletion",
239 ));
240 }
241 let arn = d.arn.clone();
242 account.streaming_distributions.remove(&id);
243 account.tags.remove(&arn);
246 drop(state);
247 Ok(crate::policies::empty(StatusCode::NO_CONTENT))
248 }
249
250 pub(crate) fn list_streaming_distributions(
251 &self,
252 _req: &AwsRequest,
253 ) -> Result<AwsResponse, AwsServiceError> {
254 let state = self.state.read();
255 let mut items: Vec<StoredStreamingDistribution> = state
256 .accounts
257 .get(DEFAULT_ACCOUNT)
258 .map(|a| a.streaming_distributions.values().cloned().collect())
259 .unwrap_or_default();
260 drop(state);
261 items.sort_by(|a, b| a.id.cmp(&b.id));
262
263 let mut body = String::with_capacity(512);
264 body.push_str(XML_DECL);
265 body.push_str(&format!("<StreamingDistributionList xmlns=\"{NS}\">"));
266 body.push_str("<Marker></Marker>");
267 body.push_str("<MaxItems>100</MaxItems>");
268 body.push_str("<IsTruncated>false</IsTruncated>");
269 body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
270 body.push_str("<Items>");
271 for d in &items {
272 body.push_str("<StreamingDistributionSummary>");
273 push_summary_inner(&mut body, d);
274 body.push_str("</StreamingDistributionSummary>");
275 }
276 body.push_str("</Items>");
277 body.push_str("</StreamingDistributionList>");
278 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
279 }
280}
281
282fn generate_streaming_id() -> String {
285 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
286 format!("S{}", &raw[..13])
287}
288
289fn render_streaming_distribution(d: &StoredStreamingDistribution) -> String {
290 let mut out = String::with_capacity(512);
291 out.push_str(XML_DECL);
292 out.push_str(&format!("<StreamingDistribution xmlns=\"{NS}\">"));
293 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
294 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
295 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
296 out.push_str(&format!(
297 "<LastModifiedTime>{}</LastModifiedTime>",
298 rfc3339(&d.last_modified_time)
299 ));
300 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
301 out.push_str(&render_active_trusted_signers(&d.config));
302 out.push_str(&serialize_config_inner(&d.config));
303 out.push_str("</StreamingDistribution>");
304 out
305}
306
307fn render_streaming_distribution_config(cfg: &StreamingDistributionConfig) -> String {
308 let mut out = String::with_capacity(512);
309 out.push_str(XML_DECL);
310 out.push_str(&format!("<StreamingDistributionConfig xmlns=\"{NS}\">"));
311 out.push_str(&serialize_config_body(cfg));
312 out.push_str("</StreamingDistributionConfig>");
313 out
314}
315
316fn push_summary_inner(out: &mut String, d: &StoredStreamingDistribution) {
317 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
318 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
319 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
320 out.push_str(&format!(
321 "<LastModifiedTime>{}</LastModifiedTime>",
322 rfc3339(&d.last_modified_time)
323 ));
324 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
325 out.push_str("<S3Origin>");
327 out.push_str(&format!(
328 "<DomainName>{}</DomainName>",
329 esc(&d.config.s3_origin.domain_name)
330 ));
331 out.push_str(&format!(
332 "<OriginAccessIdentity>{}</OriginAccessIdentity>",
333 esc(&d.config.s3_origin.origin_access_identity)
334 ));
335 out.push_str("</S3Origin>");
336 out.push_str(&render_aliases(&d.config));
337 out.push_str(&render_trusted_signers(&d.config));
338 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
339 out.push_str(&format!(
340 "<PriceClass>{}</PriceClass>",
341 esc(&d.config.price_class)
342 ));
343 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
344}
345
346fn serialize_config_inner(cfg: &StreamingDistributionConfig) -> String {
347 let mut out = String::with_capacity(512);
348 out.push_str("<StreamingDistributionConfig>");
349 out.push_str(&serialize_config_body(cfg));
350 out.push_str("</StreamingDistributionConfig>");
351 out
352}
353
354fn serialize_config_body(cfg: &StreamingDistributionConfig) -> String {
355 let mut out = String::with_capacity(512);
356 out.push_str(&format!(
357 "<CallerReference>{}</CallerReference>",
358 esc(&cfg.caller_reference)
359 ));
360 out.push_str("<S3Origin>");
361 out.push_str(&format!(
362 "<DomainName>{}</DomainName>",
363 esc(&cfg.s3_origin.domain_name)
364 ));
365 out.push_str(&format!(
366 "<OriginAccessIdentity>{}</OriginAccessIdentity>",
367 esc(&cfg.s3_origin.origin_access_identity)
368 ));
369 out.push_str("</S3Origin>");
370 out.push_str(&render_aliases(cfg));
371 out.push_str(&format!("<Comment>{}</Comment>", esc(&cfg.comment)));
372 if let Some(log) = &cfg.logging {
373 out.push_str("<Logging>");
374 out.push_str(&format!("<Enabled>{}</Enabled>", log.enabled));
375 out.push_str(&format!("<Bucket>{}</Bucket>", esc(&log.bucket)));
376 out.push_str(&format!("<Prefix>{}</Prefix>", esc(&log.prefix)));
377 out.push_str("</Logging>");
378 }
379 out.push_str(&render_trusted_signers(cfg));
380 out.push_str(&format!(
381 "<PriceClass>{}</PriceClass>",
382 esc(&cfg.price_class)
383 ));
384 out.push_str(&format!("<Enabled>{}</Enabled>", cfg.enabled));
385 out
386}
387
388fn render_aliases(cfg: &StreamingDistributionConfig) -> String {
389 let mut out = String::with_capacity(64);
390 let (qty, items) = match &cfg.aliases {
391 Some(a) => (a.quantity, a.items.clone()),
392 None => (0, None),
393 };
394 out.push_str("<Aliases>");
395 out.push_str(&format!("<Quantity>{}</Quantity>", qty));
396 if let Some(it) = items {
397 out.push_str("<Items>");
398 for c in &it.cname {
399 out.push_str(&format!("<CNAME>{}</CNAME>", esc(c)));
400 }
401 out.push_str("</Items>");
402 }
403 out.push_str("</Aliases>");
404 out
405}
406
407fn render_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
408 let mut out = String::with_capacity(64);
409 out.push_str("<TrustedSigners>");
410 out.push_str(&format!(
411 "<Enabled>{}</Enabled>",
412 cfg.trusted_signers.enabled
413 ));
414 out.push_str(&format!(
415 "<Quantity>{}</Quantity>",
416 cfg.trusted_signers.quantity
417 ));
418 if let Some(it) = &cfg.trusted_signers.items {
419 out.push_str("<Items>");
420 for a in &it.aws_account_number {
421 out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
422 }
423 out.push_str("</Items>");
424 }
425 out.push_str("</TrustedSigners>");
426 out
427}
428
429fn render_active_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
430 let mut out = String::with_capacity(64);
431 out.push_str("<ActiveTrustedSigners>");
432 out.push_str(&format!(
433 "<Enabled>{}</Enabled>",
434 cfg.trusted_signers.enabled
435 ));
436 out.push_str(&format!(
437 "<Quantity>{}</Quantity>",
438 cfg.trusted_signers.quantity
439 ));
440 if let Some(it) = &cfg.trusted_signers.items {
441 out.push_str("<Items>");
442 for a in &it.aws_account_number {
443 out.push_str("<Signer>");
444 out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
445 out.push_str("</Signer>");
446 }
447 out.push_str("</Items>");
448 }
449 out.push_str("</ActiveTrustedSigners>");
450 out
451}
452
453fn streaming_configs_equal(
458 lhs: &StreamingDistributionConfig,
459 rhs: &StreamingDistributionConfig,
460) -> bool {
461 let Ok(a) = xml_io::to_xml_root("StreamingDistributionConfig", lhs) else {
462 return false;
463 };
464 let Ok(b) = xml_io::to_xml_root("StreamingDistributionConfig", rhs) else {
465 return false;
466 };
467 a == b
468}