Skip to main content

fakecloud_cloudfront/
streaming_service.rs

1//! Handlers for CloudFront Streaming Distributions (legacy RTMP).
2
3use chrono::Utc;
4use http::header::{ETAG, LOCATION};
5use http::{HeaderMap, HeaderValue, StatusCode};
6use uuid::Uuid;
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use crate::policies::{
11    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
12};
13use crate::router::Route;
14use crate::service::{
15    aws_error, esc, generate_etag, invalid_argument, xml_response, CloudFrontService,
16    DEFAULT_ACCOUNT,
17};
18use crate::state::Tag;
19use crate::streaming::{
20    StoredStreamingDistribution, StreamingDistributionConfig, StreamingDistributionConfigWithTags,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27impl CloudFrontService {
28    pub(crate) fn create_streaming_distribution(
29        &self,
30        req: &AwsRequest,
31        with_tags: bool,
32    ) -> Result<AwsResponse, AwsServiceError> {
33        let (config, tags) = if with_tags {
34            let parsed: StreamingDistributionConfigWithTags = xml_io::from_xml_root(&req.body)
35                .map_err(|e| {
36                    invalid_argument(format!(
37                        "invalid StreamingDistributionConfigWithTags XML: {e}"
38                    ))
39                })?;
40            let tags = parsed
41                .tags
42                .items
43                .map(|i| {
44                    i.tag
45                        .into_iter()
46                        .map(|t| Tag {
47                            key: t.key,
48                            value: t.value,
49                        })
50                        .collect()
51                })
52                .unwrap_or_default();
53            (parsed.streaming_distribution_config, tags)
54        } else {
55            let parsed: StreamingDistributionConfig =
56                xml_io::from_xml_root(&req.body).map_err(|e| {
57                    invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
58                })?;
59            (parsed, Vec::new())
60        };
61        if config.caller_reference.is_empty() {
62            return Err(invalid_argument("CallerReference is required"));
63        }
64
65        let mut state = self.state.write();
66        let account = state
67            .accounts
68            .entry(DEFAULT_ACCOUNT.to_string())
69            .or_default();
70
71        if let Some(existing) = account
72            .streaming_distributions
73            .values()
74            .find(|d| d.config.caller_reference == config.caller_reference)
75        {
76            return Err(aws_error(
77                StatusCode::CONFLICT,
78                "StreamingDistributionAlreadyExists",
79                format!(
80                    "Streaming distribution with same CallerReference exists: {}",
81                    existing.id
82                ),
83            ));
84        }
85
86        let id = generate_streaming_id();
87        let now = Utc::now();
88        let etag = generate_etag();
89        let domain = format!("{}.cloudfront.net", id.to_lowercase());
90        let arn = format!(
91            "arn:aws:cloudfront::{}:streaming-distribution/{}",
92            DEFAULT_ACCOUNT, id
93        );
94
95        let stored = StoredStreamingDistribution {
96            id: id.clone(),
97            arn: arn.clone(),
98            status: "Deployed".to_string(),
99            last_modified_time: now,
100            domain_name: domain,
101            etag: etag.clone(),
102            config,
103        };
104        account
105            .streaming_distributions
106            .insert(id.clone(), stored.clone());
107        if !tags.is_empty() {
108            account.tags.insert(arn.clone(), tags);
109        }
110        drop(state);
111
112        let body = render_streaming_distribution(&stored);
113        let mut headers = HeaderMap::new();
114        if let Ok(v) = HeaderValue::from_str(&etag) {
115            headers.insert(ETAG, v);
116        }
117        if let Ok(v) = HeaderValue::from_str(&stored.arn) {
118            headers.insert(LOCATION, v);
119        }
120        Ok(xml_response(StatusCode::CREATED, body, headers))
121    }
122
123    pub(crate) fn get_streaming_distribution(
124        &self,
125        route: &Route,
126    ) -> Result<AwsResponse, AwsServiceError> {
127        let id = route_id(route, "StreamingDistribution")?;
128        let state = self.state.read();
129        let d = state
130            .accounts
131            .get(DEFAULT_ACCOUNT)
132            .and_then(|a| a.streaming_distributions.get(&id).cloned())
133            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
134        drop(state);
135        let body = render_streaming_distribution(&d);
136        Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
137    }
138
139    pub(crate) fn get_streaming_distribution_config(
140        &self,
141        route: &Route,
142    ) -> Result<AwsResponse, AwsServiceError> {
143        let id = route_id(route, "StreamingDistribution")?;
144        let state = self.state.read();
145        let d = state
146            .accounts
147            .get(DEFAULT_ACCOUNT)
148            .and_then(|a| a.streaming_distributions.get(&id).cloned())
149            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
150        drop(state);
151        let body = render_streaming_distribution_config(&d.config);
152        Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
153    }
154
155    pub(crate) fn update_streaming_distribution(
156        &self,
157        req: &AwsRequest,
158        route: &Route,
159    ) -> Result<AwsResponse, AwsServiceError> {
160        let id = route_id(route, "StreamingDistribution")?;
161        let if_match = require_if_match(req)?;
162        let cfg: StreamingDistributionConfig = xml_io::from_xml_root(&req.body).map_err(|e| {
163            invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
164        })?;
165        if cfg.caller_reference.is_empty() {
166            return Err(invalid_argument("CallerReference is required"));
167        }
168
169        let mut state = self.state.write();
170        let account = state
171            .accounts
172            .get_mut(DEFAULT_ACCOUNT)
173            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
174        let d = account
175            .streaming_distributions
176            .get_mut(&id)
177            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
178        if d.etag != if_match {
179            return Err(precondition_failed());
180        }
181        if d.config.caller_reference != cfg.caller_reference {
182            return Err(invalid_argument(
183                "CallerReference cannot change on UpdateStreamingDistribution",
184            ));
185        }
186        d.config = cfg;
187        d.etag = generate_etag();
188        d.last_modified_time = Utc::now();
189        let snap = d.clone();
190        drop(state);
191        let body = render_streaming_distribution(&snap);
192        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
193    }
194
195    pub(crate) fn delete_streaming_distribution(
196        &self,
197        req: &AwsRequest,
198        route: &Route,
199    ) -> Result<AwsResponse, AwsServiceError> {
200        let id = route_id(route, "StreamingDistribution")?;
201        let if_match = require_if_match(req)?;
202        let mut state = self.state.write();
203        let account = state
204            .accounts
205            .get_mut(DEFAULT_ACCOUNT)
206            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
207        let d = account
208            .streaming_distributions
209            .get(&id)
210            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
211        if d.etag != if_match {
212            return Err(precondition_failed());
213        }
214        if d.config.enabled {
215            return Err(aws_error(
216                StatusCode::PRECONDITION_FAILED,
217                "StreamingDistributionNotDisabled",
218                "StreamingDistribution must be disabled before deletion",
219            ));
220        }
221        let arn = d.arn.clone();
222        account.streaming_distributions.remove(&id);
223        // Tags are keyed by ARN — drop them too so ListTagsForResource
224        // doesn't return tags for a deleted resource.
225        account.tags.remove(&arn);
226        drop(state);
227        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
228    }
229
230    pub(crate) fn list_streaming_distributions(
231        &self,
232        _req: &AwsRequest,
233    ) -> Result<AwsResponse, AwsServiceError> {
234        let state = self.state.read();
235        let mut items: Vec<StoredStreamingDistribution> = state
236            .accounts
237            .get(DEFAULT_ACCOUNT)
238            .map(|a| a.streaming_distributions.values().cloned().collect())
239            .unwrap_or_default();
240        drop(state);
241        items.sort_by(|a, b| a.id.cmp(&b.id));
242
243        let mut body = String::with_capacity(512);
244        body.push_str(XML_DECL);
245        body.push_str(&format!("<StreamingDistributionList xmlns=\"{NS}\">"));
246        body.push_str("<Marker></Marker>");
247        body.push_str("<MaxItems>100</MaxItems>");
248        body.push_str("<IsTruncated>false</IsTruncated>");
249        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
250        body.push_str("<Items>");
251        for d in &items {
252            body.push_str("<StreamingDistributionSummary>");
253            push_summary_inner(&mut body, d);
254            body.push_str("</StreamingDistributionSummary>");
255        }
256        body.push_str("</Items>");
257        body.push_str("</StreamingDistributionList>");
258        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
259    }
260}
261
262// ─── Helpers ──────────────────────────────────────────────────────────
263
264fn generate_streaming_id() -> String {
265    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
266    format!("S{}", &raw[..13])
267}
268
269fn render_streaming_distribution(d: &StoredStreamingDistribution) -> String {
270    let mut out = String::with_capacity(512);
271    out.push_str(XML_DECL);
272    out.push_str(&format!("<StreamingDistribution xmlns=\"{NS}\">"));
273    out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
274    out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
275    out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
276    out.push_str(&format!(
277        "<LastModifiedTime>{}</LastModifiedTime>",
278        rfc3339(&d.last_modified_time)
279    ));
280    out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
281    out.push_str(&render_active_trusted_signers(&d.config));
282    out.push_str(&serialize_config_inner(&d.config));
283    out.push_str("</StreamingDistribution>");
284    out
285}
286
287fn render_streaming_distribution_config(cfg: &StreamingDistributionConfig) -> String {
288    let mut out = String::with_capacity(512);
289    out.push_str(XML_DECL);
290    out.push_str(&format!("<StreamingDistributionConfig xmlns=\"{NS}\">"));
291    out.push_str(&serialize_config_body(cfg));
292    out.push_str("</StreamingDistributionConfig>");
293    out
294}
295
296fn push_summary_inner(out: &mut String, d: &StoredStreamingDistribution) {
297    out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
298    out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
299    out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
300    out.push_str(&format!(
301        "<LastModifiedTime>{}</LastModifiedTime>",
302        rfc3339(&d.last_modified_time)
303    ));
304    out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
305    // S3Origin
306    out.push_str("<S3Origin>");
307    out.push_str(&format!(
308        "<DomainName>{}</DomainName>",
309        esc(&d.config.s3_origin.domain_name)
310    ));
311    out.push_str(&format!(
312        "<OriginAccessIdentity>{}</OriginAccessIdentity>",
313        esc(&d.config.s3_origin.origin_access_identity)
314    ));
315    out.push_str("</S3Origin>");
316    out.push_str(&render_aliases(&d.config));
317    out.push_str(&render_trusted_signers(&d.config));
318    out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
319    out.push_str(&format!(
320        "<PriceClass>{}</PriceClass>",
321        esc(&d.config.price_class)
322    ));
323    out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
324}
325
326fn serialize_config_inner(cfg: &StreamingDistributionConfig) -> String {
327    let mut out = String::with_capacity(512);
328    out.push_str("<StreamingDistributionConfig>");
329    out.push_str(&serialize_config_body(cfg));
330    out.push_str("</StreamingDistributionConfig>");
331    out
332}
333
334fn serialize_config_body(cfg: &StreamingDistributionConfig) -> String {
335    let mut out = String::with_capacity(512);
336    out.push_str(&format!(
337        "<CallerReference>{}</CallerReference>",
338        esc(&cfg.caller_reference)
339    ));
340    out.push_str("<S3Origin>");
341    out.push_str(&format!(
342        "<DomainName>{}</DomainName>",
343        esc(&cfg.s3_origin.domain_name)
344    ));
345    out.push_str(&format!(
346        "<OriginAccessIdentity>{}</OriginAccessIdentity>",
347        esc(&cfg.s3_origin.origin_access_identity)
348    ));
349    out.push_str("</S3Origin>");
350    out.push_str(&render_aliases(cfg));
351    out.push_str(&format!("<Comment>{}</Comment>", esc(&cfg.comment)));
352    if let Some(log) = &cfg.logging {
353        out.push_str("<Logging>");
354        out.push_str(&format!("<Enabled>{}</Enabled>", log.enabled));
355        out.push_str(&format!("<Bucket>{}</Bucket>", esc(&log.bucket)));
356        out.push_str(&format!("<Prefix>{}</Prefix>", esc(&log.prefix)));
357        out.push_str("</Logging>");
358    }
359    out.push_str(&render_trusted_signers(cfg));
360    out.push_str(&format!(
361        "<PriceClass>{}</PriceClass>",
362        esc(&cfg.price_class)
363    ));
364    out.push_str(&format!("<Enabled>{}</Enabled>", cfg.enabled));
365    out
366}
367
368fn render_aliases(cfg: &StreamingDistributionConfig) -> String {
369    let mut out = String::with_capacity(64);
370    let (qty, items) = match &cfg.aliases {
371        Some(a) => (a.quantity, a.items.clone()),
372        None => (0, None),
373    };
374    out.push_str("<Aliases>");
375    out.push_str(&format!("<Quantity>{}</Quantity>", qty));
376    if let Some(it) = items {
377        out.push_str("<Items>");
378        for c in &it.cname {
379            out.push_str(&format!("<CNAME>{}</CNAME>", esc(c)));
380        }
381        out.push_str("</Items>");
382    }
383    out.push_str("</Aliases>");
384    out
385}
386
387fn render_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
388    let mut out = String::with_capacity(64);
389    out.push_str("<TrustedSigners>");
390    out.push_str(&format!(
391        "<Enabled>{}</Enabled>",
392        cfg.trusted_signers.enabled
393    ));
394    out.push_str(&format!(
395        "<Quantity>{}</Quantity>",
396        cfg.trusted_signers.quantity
397    ));
398    if let Some(it) = &cfg.trusted_signers.items {
399        out.push_str("<Items>");
400        for a in &it.aws_account_number {
401            out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
402        }
403        out.push_str("</Items>");
404    }
405    out.push_str("</TrustedSigners>");
406    out
407}
408
409fn render_active_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
410    let mut out = String::with_capacity(64);
411    out.push_str("<ActiveTrustedSigners>");
412    out.push_str(&format!(
413        "<Enabled>{}</Enabled>",
414        cfg.trusted_signers.enabled
415    ));
416    out.push_str(&format!(
417        "<Quantity>{}</Quantity>",
418        cfg.trusted_signers.quantity
419    ));
420    if let Some(it) = &cfg.trusted_signers.items {
421        out.push_str("<Items>");
422        for a in &it.aws_account_number {
423            out.push_str("<Signer>");
424            out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
425            out.push_str("</Signer>");
426        }
427        out.push_str("</Items>");
428    }
429    out.push_str("</ActiveTrustedSigners>");
430    out
431}