1use chrono::Utc;
4use http::header::{ETAG, LOCATION};
5use http::{HeaderMap, HeaderValue, StatusCode};
6use uuid::Uuid;
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use crate::policies::{
11 not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
12};
13use crate::router::Route;
14use crate::service::{
15 aws_error, esc, generate_etag, invalid_argument, xml_response, CloudFrontService,
16 DEFAULT_ACCOUNT,
17};
18use crate::state::Tag;
19use crate::streaming::{
20 StoredStreamingDistribution, StreamingDistributionConfig, StreamingDistributionConfigWithTags,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27impl CloudFrontService {
28 pub(crate) fn create_streaming_distribution(
29 &self,
30 req: &AwsRequest,
31 with_tags: bool,
32 ) -> Result<AwsResponse, AwsServiceError> {
33 let (config, tags) = if with_tags {
34 let parsed: StreamingDistributionConfigWithTags = xml_io::from_xml_root(&req.body)
35 .map_err(|e| {
36 invalid_argument(format!(
37 "invalid StreamingDistributionConfigWithTags XML: {e}"
38 ))
39 })?;
40 let tags = parsed
41 .tags
42 .items
43 .map(|i| {
44 i.tag
45 .into_iter()
46 .map(|t| Tag {
47 key: t.key,
48 value: t.value,
49 })
50 .collect()
51 })
52 .unwrap_or_default();
53 (parsed.streaming_distribution_config, tags)
54 } else {
55 let parsed: StreamingDistributionConfig =
56 xml_io::from_xml_root(&req.body).map_err(|e| {
57 invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
58 })?;
59 (parsed, Vec::new())
60 };
61 if config.caller_reference.is_empty() {
62 return Err(invalid_argument("CallerReference is required"));
63 }
64
65 let mut state = self.state.write();
66 let account = state
67 .accounts
68 .entry(DEFAULT_ACCOUNT.to_string())
69 .or_default();
70
71 if let Some(existing) = account
72 .streaming_distributions
73 .values()
74 .find(|d| d.config.caller_reference == config.caller_reference)
75 {
76 return Err(aws_error(
77 StatusCode::CONFLICT,
78 "StreamingDistributionAlreadyExists",
79 format!(
80 "Streaming distribution with same CallerReference exists: {}",
81 existing.id
82 ),
83 ));
84 }
85
86 let id = generate_streaming_id();
87 let now = Utc::now();
88 let etag = generate_etag();
89 let domain = format!("{}.cloudfront.net", id.to_lowercase());
90 let arn = format!(
91 "arn:aws:cloudfront::{}:streaming-distribution/{}",
92 DEFAULT_ACCOUNT, id
93 );
94
95 let stored = StoredStreamingDistribution {
96 id: id.clone(),
97 arn: arn.clone(),
98 status: "Deployed".to_string(),
99 last_modified_time: now,
100 domain_name: domain,
101 etag: etag.clone(),
102 config,
103 };
104 account
105 .streaming_distributions
106 .insert(id.clone(), stored.clone());
107 if !tags.is_empty() {
108 account.tags.insert(arn.clone(), tags);
109 }
110 drop(state);
111
112 let body = render_streaming_distribution(&stored);
113 let mut headers = HeaderMap::new();
114 if let Ok(v) = HeaderValue::from_str(&etag) {
115 headers.insert(ETAG, v);
116 }
117 if let Ok(v) = HeaderValue::from_str(&stored.arn) {
118 headers.insert(LOCATION, v);
119 }
120 Ok(xml_response(StatusCode::CREATED, body, headers))
121 }
122
123 pub(crate) fn get_streaming_distribution(
124 &self,
125 route: &Route,
126 ) -> Result<AwsResponse, AwsServiceError> {
127 let id = route_id(route, "StreamingDistribution")?;
128 let state = self.state.read();
129 let d = state
130 .accounts
131 .get(DEFAULT_ACCOUNT)
132 .and_then(|a| a.streaming_distributions.get(&id).cloned())
133 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
134 drop(state);
135 let body = render_streaming_distribution(&d);
136 Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
137 }
138
139 pub(crate) fn get_streaming_distribution_config(
140 &self,
141 route: &Route,
142 ) -> Result<AwsResponse, AwsServiceError> {
143 let id = route_id(route, "StreamingDistribution")?;
144 let state = self.state.read();
145 let d = state
146 .accounts
147 .get(DEFAULT_ACCOUNT)
148 .and_then(|a| a.streaming_distributions.get(&id).cloned())
149 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
150 drop(state);
151 let body = render_streaming_distribution_config(&d.config);
152 Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
153 }
154
155 pub(crate) fn update_streaming_distribution(
156 &self,
157 req: &AwsRequest,
158 route: &Route,
159 ) -> Result<AwsResponse, AwsServiceError> {
160 let id = route_id(route, "StreamingDistribution")?;
161 let if_match = require_if_match(req)?;
162 let cfg: StreamingDistributionConfig = xml_io::from_xml_root(&req.body).map_err(|e| {
163 invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
164 })?;
165 if cfg.caller_reference.is_empty() {
166 return Err(invalid_argument("CallerReference is required"));
167 }
168
169 let mut state = self.state.write();
170 let account = state
171 .accounts
172 .get_mut(DEFAULT_ACCOUNT)
173 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
174 let d = account
175 .streaming_distributions
176 .get_mut(&id)
177 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
178 if d.etag != if_match {
179 return Err(precondition_failed());
180 }
181 if d.config.caller_reference != cfg.caller_reference {
182 return Err(invalid_argument(
183 "CallerReference cannot change on UpdateStreamingDistribution",
184 ));
185 }
186 d.config = cfg;
187 d.etag = generate_etag();
188 d.last_modified_time = Utc::now();
189 let snap = d.clone();
190 drop(state);
191 let body = render_streaming_distribution(&snap);
192 Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
193 }
194
195 pub(crate) fn delete_streaming_distribution(
196 &self,
197 req: &AwsRequest,
198 route: &Route,
199 ) -> Result<AwsResponse, AwsServiceError> {
200 let id = route_id(route, "StreamingDistribution")?;
201 let if_match = require_if_match(req)?;
202 let mut state = self.state.write();
203 let account = state
204 .accounts
205 .get_mut(DEFAULT_ACCOUNT)
206 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
207 let d = account
208 .streaming_distributions
209 .get(&id)
210 .ok_or_else(|| not_found("StreamingDistribution", &id))?;
211 if d.etag != if_match {
212 return Err(precondition_failed());
213 }
214 if d.config.enabled {
215 return Err(aws_error(
216 StatusCode::PRECONDITION_FAILED,
217 "StreamingDistributionNotDisabled",
218 "StreamingDistribution must be disabled before deletion",
219 ));
220 }
221 let arn = d.arn.clone();
222 account.streaming_distributions.remove(&id);
223 account.tags.remove(&arn);
226 drop(state);
227 Ok(crate::policies::empty(StatusCode::NO_CONTENT))
228 }
229
230 pub(crate) fn list_streaming_distributions(
231 &self,
232 _req: &AwsRequest,
233 ) -> Result<AwsResponse, AwsServiceError> {
234 let state = self.state.read();
235 let mut items: Vec<StoredStreamingDistribution> = state
236 .accounts
237 .get(DEFAULT_ACCOUNT)
238 .map(|a| a.streaming_distributions.values().cloned().collect())
239 .unwrap_or_default();
240 drop(state);
241 items.sort_by(|a, b| a.id.cmp(&b.id));
242
243 let mut body = String::with_capacity(512);
244 body.push_str(XML_DECL);
245 body.push_str(&format!("<StreamingDistributionList xmlns=\"{NS}\">"));
246 body.push_str("<Marker></Marker>");
247 body.push_str("<MaxItems>100</MaxItems>");
248 body.push_str("<IsTruncated>false</IsTruncated>");
249 body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
250 body.push_str("<Items>");
251 for d in &items {
252 body.push_str("<StreamingDistributionSummary>");
253 push_summary_inner(&mut body, d);
254 body.push_str("</StreamingDistributionSummary>");
255 }
256 body.push_str("</Items>");
257 body.push_str("</StreamingDistributionList>");
258 Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
259 }
260}
261
262fn generate_streaming_id() -> String {
265 let raw = Uuid::new_v4().simple().to_string().to_uppercase();
266 format!("S{}", &raw[..13])
267}
268
269fn render_streaming_distribution(d: &StoredStreamingDistribution) -> String {
270 let mut out = String::with_capacity(512);
271 out.push_str(XML_DECL);
272 out.push_str(&format!("<StreamingDistribution xmlns=\"{NS}\">"));
273 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
274 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
275 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
276 out.push_str(&format!(
277 "<LastModifiedTime>{}</LastModifiedTime>",
278 rfc3339(&d.last_modified_time)
279 ));
280 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
281 out.push_str(&render_active_trusted_signers(&d.config));
282 out.push_str(&serialize_config_inner(&d.config));
283 out.push_str("</StreamingDistribution>");
284 out
285}
286
287fn render_streaming_distribution_config(cfg: &StreamingDistributionConfig) -> String {
288 let mut out = String::with_capacity(512);
289 out.push_str(XML_DECL);
290 out.push_str(&format!("<StreamingDistributionConfig xmlns=\"{NS}\">"));
291 out.push_str(&serialize_config_body(cfg));
292 out.push_str("</StreamingDistributionConfig>");
293 out
294}
295
296fn push_summary_inner(out: &mut String, d: &StoredStreamingDistribution) {
297 out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
298 out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
299 out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
300 out.push_str(&format!(
301 "<LastModifiedTime>{}</LastModifiedTime>",
302 rfc3339(&d.last_modified_time)
303 ));
304 out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
305 out.push_str("<S3Origin>");
307 out.push_str(&format!(
308 "<DomainName>{}</DomainName>",
309 esc(&d.config.s3_origin.domain_name)
310 ));
311 out.push_str(&format!(
312 "<OriginAccessIdentity>{}</OriginAccessIdentity>",
313 esc(&d.config.s3_origin.origin_access_identity)
314 ));
315 out.push_str("</S3Origin>");
316 out.push_str(&render_aliases(&d.config));
317 out.push_str(&render_trusted_signers(&d.config));
318 out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
319 out.push_str(&format!(
320 "<PriceClass>{}</PriceClass>",
321 esc(&d.config.price_class)
322 ));
323 out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
324}
325
326fn serialize_config_inner(cfg: &StreamingDistributionConfig) -> String {
327 let mut out = String::with_capacity(512);
328 out.push_str("<StreamingDistributionConfig>");
329 out.push_str(&serialize_config_body(cfg));
330 out.push_str("</StreamingDistributionConfig>");
331 out
332}
333
334fn serialize_config_body(cfg: &StreamingDistributionConfig) -> String {
335 let mut out = String::with_capacity(512);
336 out.push_str(&format!(
337 "<CallerReference>{}</CallerReference>",
338 esc(&cfg.caller_reference)
339 ));
340 out.push_str("<S3Origin>");
341 out.push_str(&format!(
342 "<DomainName>{}</DomainName>",
343 esc(&cfg.s3_origin.domain_name)
344 ));
345 out.push_str(&format!(
346 "<OriginAccessIdentity>{}</OriginAccessIdentity>",
347 esc(&cfg.s3_origin.origin_access_identity)
348 ));
349 out.push_str("</S3Origin>");
350 out.push_str(&render_aliases(cfg));
351 out.push_str(&format!("<Comment>{}</Comment>", esc(&cfg.comment)));
352 if let Some(log) = &cfg.logging {
353 out.push_str("<Logging>");
354 out.push_str(&format!("<Enabled>{}</Enabled>", log.enabled));
355 out.push_str(&format!("<Bucket>{}</Bucket>", esc(&log.bucket)));
356 out.push_str(&format!("<Prefix>{}</Prefix>", esc(&log.prefix)));
357 out.push_str("</Logging>");
358 }
359 out.push_str(&render_trusted_signers(cfg));
360 out.push_str(&format!(
361 "<PriceClass>{}</PriceClass>",
362 esc(&cfg.price_class)
363 ));
364 out.push_str(&format!("<Enabled>{}</Enabled>", cfg.enabled));
365 out
366}
367
368fn render_aliases(cfg: &StreamingDistributionConfig) -> String {
369 let mut out = String::with_capacity(64);
370 let (qty, items) = match &cfg.aliases {
371 Some(a) => (a.quantity, a.items.clone()),
372 None => (0, None),
373 };
374 out.push_str("<Aliases>");
375 out.push_str(&format!("<Quantity>{}</Quantity>", qty));
376 if let Some(it) = items {
377 out.push_str("<Items>");
378 for c in &it.cname {
379 out.push_str(&format!("<CNAME>{}</CNAME>", esc(c)));
380 }
381 out.push_str("</Items>");
382 }
383 out.push_str("</Aliases>");
384 out
385}
386
387fn render_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
388 let mut out = String::with_capacity(64);
389 out.push_str("<TrustedSigners>");
390 out.push_str(&format!(
391 "<Enabled>{}</Enabled>",
392 cfg.trusted_signers.enabled
393 ));
394 out.push_str(&format!(
395 "<Quantity>{}</Quantity>",
396 cfg.trusted_signers.quantity
397 ));
398 if let Some(it) = &cfg.trusted_signers.items {
399 out.push_str("<Items>");
400 for a in &it.aws_account_number {
401 out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
402 }
403 out.push_str("</Items>");
404 }
405 out.push_str("</TrustedSigners>");
406 out
407}
408
409fn render_active_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
410 let mut out = String::with_capacity(64);
411 out.push_str("<ActiveTrustedSigners>");
412 out.push_str(&format!(
413 "<Enabled>{}</Enabled>",
414 cfg.trusted_signers.enabled
415 ));
416 out.push_str(&format!(
417 "<Quantity>{}</Quantity>",
418 cfg.trusted_signers.quantity
419 ));
420 if let Some(it) = &cfg.trusted_signers.items {
421 out.push_str("<Items>");
422 for a in &it.aws_account_number {
423 out.push_str("<Signer>");
424 out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
425 out.push_str("</Signer>");
426 }
427 out.push_str("</Items>");
428 }
429 out.push_str("</ActiveTrustedSigners>");
430 out
431}