Skip to main content

fakecloud_cloudfront/
streaming_service.rs

1//! Handlers for CloudFront Streaming Distributions (legacy RTMP).
2
3use chrono::Utc;
4use http::header::{ETAG, LOCATION};
5use http::{HeaderMap, HeaderValue, StatusCode};
6use uuid::Uuid;
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use crate::policies::{
11    not_found, precondition_failed, require_if_match, rfc3339, route_id, xml_with_etag,
12};
13use crate::router::Route;
14use crate::service::{
15    aws_error, esc, generate_etag, invalid_argument, xml_response, CloudFrontService,
16    DEFAULT_ACCOUNT,
17};
18use crate::state::Tag;
19use crate::streaming::{
20    StoredStreamingDistribution, StreamingDistributionConfig, StreamingDistributionConfigWithTags,
21};
22use crate::xml_io;
23
24const NS: &str = crate::NAMESPACE;
25const XML_DECL: &str = r#"<?xml version="1.0" encoding="UTF-8"?>"#;
26
27impl CloudFrontService {
28    pub(crate) fn create_streaming_distribution(
29        &self,
30        req: &AwsRequest,
31        with_tags: bool,
32    ) -> Result<AwsResponse, AwsServiceError> {
33        let (config, tags) = if with_tags {
34            let parsed: StreamingDistributionConfigWithTags = xml_io::from_xml_root(&req.body)
35                .map_err(|e| {
36                    invalid_argument(format!(
37                        "invalid StreamingDistributionConfigWithTags XML: {e}"
38                    ))
39                })?;
40            let tags = parsed
41                .tags
42                .items
43                .map(|i| {
44                    i.tag
45                        .into_iter()
46                        .map(|t| Tag {
47                            key: t.key,
48                            value: t.value,
49                        })
50                        .collect()
51                })
52                .unwrap_or_default();
53            (parsed.streaming_distribution_config, tags)
54        } else {
55            let parsed: StreamingDistributionConfig =
56                xml_io::from_xml_root(&req.body).map_err(|e| {
57                    invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
58                })?;
59            (parsed, Vec::new())
60        };
61        if config.caller_reference.is_empty() {
62            return Err(invalid_argument("CallerReference is required"));
63        }
64
65        let mut state = self.state.write();
66        let account = state
67            .accounts
68            .entry(DEFAULT_ACCOUNT.to_string())
69            .or_default();
70
71        if let Some(existing) = account
72            .streaming_distributions
73            .values()
74            .find(|d| d.config.caller_reference == config.caller_reference)
75        {
76            return Err(aws_error(
77                StatusCode::CONFLICT,
78                "StreamingDistributionAlreadyExists",
79                format!(
80                    "Streaming distribution with same CallerReference exists: {}",
81                    existing.id
82                ),
83            ));
84        }
85
86        let id = generate_streaming_id();
87        let now = Utc::now();
88        let etag = generate_etag();
89        let domain = format!("{}.cloudfront.net", id.to_lowercase());
90        let arn = format!(
91            "arn:aws:cloudfront::{}:streaming-distribution/{}",
92            DEFAULT_ACCOUNT, id
93        );
94
95        let stored = StoredStreamingDistribution {
96            id: id.clone(),
97            arn: arn.clone(),
98            // Real CloudFront returns `InProgress` for streaming
99            // distributions just like regular distributions and flips to
100            // `Deployed` once edge propagation completes.
101            status: "InProgress".to_string(),
102            last_modified_time: now,
103            domain_name: domain,
104            etag: etag.clone(),
105            config,
106        };
107        account
108            .streaming_distributions
109            .insert(id.clone(), stored.clone());
110        if !tags.is_empty() {
111            account.tags.insert(arn.clone(), tags);
112        }
113        drop(state);
114
115        self.schedule_streaming_distribution_deploy(id.clone());
116
117        let body = render_streaming_distribution(&stored);
118        let mut headers = HeaderMap::new();
119        if let Ok(v) = HeaderValue::from_str(&etag) {
120            headers.insert(ETAG, v);
121        }
122        if let Ok(v) = HeaderValue::from_str(&stored.arn) {
123            headers.insert(LOCATION, v);
124        }
125        Ok(xml_response(StatusCode::CREATED, body, headers))
126    }
127
128    pub(crate) fn get_streaming_distribution(
129        &self,
130        route: &Route,
131    ) -> Result<AwsResponse, AwsServiceError> {
132        let id = route_id(route, "StreamingDistribution")?;
133        let state = self.state.read();
134        let d = state
135            .accounts
136            .get(DEFAULT_ACCOUNT)
137            .and_then(|a| a.streaming_distributions.get(&id).cloned())
138            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
139        drop(state);
140        let body = render_streaming_distribution(&d);
141        Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
142    }
143
144    pub(crate) fn get_streaming_distribution_config(
145        &self,
146        route: &Route,
147    ) -> Result<AwsResponse, AwsServiceError> {
148        let id = route_id(route, "StreamingDistribution")?;
149        let state = self.state.read();
150        let d = state
151            .accounts
152            .get(DEFAULT_ACCOUNT)
153            .and_then(|a| a.streaming_distributions.get(&id).cloned())
154            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
155        drop(state);
156        let body = render_streaming_distribution_config(&d.config);
157        Ok(xml_with_etag(StatusCode::OK, body, &d.etag, None))
158    }
159
160    pub(crate) fn update_streaming_distribution(
161        &self,
162        req: &AwsRequest,
163        route: &Route,
164    ) -> Result<AwsResponse, AwsServiceError> {
165        let id = route_id(route, "StreamingDistribution")?;
166        let if_match = require_if_match(req)?;
167        let cfg: StreamingDistributionConfig = xml_io::from_xml_root(&req.body).map_err(|e| {
168            invalid_argument(format!("invalid StreamingDistributionConfig XML: {e}"))
169        })?;
170        if cfg.caller_reference.is_empty() {
171            return Err(invalid_argument("CallerReference is required"));
172        }
173
174        let mut state = self.state.write();
175        let account = state
176            .accounts
177            .get_mut(DEFAULT_ACCOUNT)
178            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
179        let d = account
180            .streaming_distributions
181            .get_mut(&id)
182            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
183        if d.etag != if_match {
184            return Err(precondition_failed());
185        }
186        if d.config.caller_reference != cfg.caller_reference {
187            return Err(invalid_argument(
188                "CallerReference cannot change on UpdateStreamingDistribution",
189            ));
190        }
191        // ETag stability: only bump when the config actually changes. A
192        // no-op UpdateStreamingDistribution leaves the ETag and status
193        // untouched, matching real CloudFront.
194        let config_changed = !streaming_configs_equal(&d.config, &cfg);
195        if config_changed {
196            d.config = cfg;
197            d.etag = generate_etag();
198            d.last_modified_time = Utc::now();
199            // UpdateStreamingDistribution kicks off fresh edge
200            // propagation; AWS flips status back to `InProgress` until
201            // the new config lands.
202            d.status = "InProgress".to_string();
203        }
204        let snap = d.clone();
205        drop(state);
206
207        if config_changed {
208            self.schedule_streaming_distribution_deploy(id.clone());
209        }
210
211        let body = render_streaming_distribution(&snap);
212        Ok(xml_with_etag(StatusCode::OK, body, &snap.etag, None))
213    }
214
215    pub(crate) fn delete_streaming_distribution(
216        &self,
217        req: &AwsRequest,
218        route: &Route,
219    ) -> Result<AwsResponse, AwsServiceError> {
220        let id = route_id(route, "StreamingDistribution")?;
221        let if_match = require_if_match(req)?;
222        let mut state = self.state.write();
223        let account = state
224            .accounts
225            .get_mut(DEFAULT_ACCOUNT)
226            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
227        let d = account
228            .streaming_distributions
229            .get(&id)
230            .ok_or_else(|| not_found("StreamingDistribution", &id))?;
231        if d.etag != if_match {
232            return Err(precondition_failed());
233        }
234        if d.config.enabled {
235            return Err(aws_error(
236                StatusCode::PRECONDITION_FAILED,
237                "StreamingDistributionNotDisabled",
238                "StreamingDistribution must be disabled before deletion",
239            ));
240        }
241        let arn = d.arn.clone();
242        account.streaming_distributions.remove(&id);
243        // Tags are keyed by ARN — drop them too so ListTagsForResource
244        // doesn't return tags for a deleted resource.
245        account.tags.remove(&arn);
246        drop(state);
247        Ok(crate::policies::empty(StatusCode::NO_CONTENT))
248    }
249
250    pub(crate) fn list_streaming_distributions(
251        &self,
252        _req: &AwsRequest,
253    ) -> Result<AwsResponse, AwsServiceError> {
254        let state = self.state.read();
255        let mut items: Vec<StoredStreamingDistribution> = state
256            .accounts
257            .get(DEFAULT_ACCOUNT)
258            .map(|a| a.streaming_distributions.values().cloned().collect())
259            .unwrap_or_default();
260        drop(state);
261        items.sort_by(|a, b| a.id.cmp(&b.id));
262
263        let mut body = String::with_capacity(512);
264        body.push_str(XML_DECL);
265        body.push_str(&format!("<StreamingDistributionList xmlns=\"{NS}\">"));
266        body.push_str("<Marker></Marker>");
267        body.push_str("<MaxItems>100</MaxItems>");
268        body.push_str("<IsTruncated>false</IsTruncated>");
269        body.push_str(&format!("<Quantity>{}</Quantity>", items.len()));
270        body.push_str("<Items>");
271        for d in &items {
272            body.push_str("<StreamingDistributionSummary>");
273            push_summary_inner(&mut body, d);
274            body.push_str("</StreamingDistributionSummary>");
275        }
276        body.push_str("</Items>");
277        body.push_str("</StreamingDistributionList>");
278        Ok(xml_response(StatusCode::OK, body, HeaderMap::new()))
279    }
280}
281
282// ─── Helpers ──────────────────────────────────────────────────────────
283
284fn generate_streaming_id() -> String {
285    let raw = Uuid::new_v4().simple().to_string().to_uppercase();
286    format!("S{}", &raw[..13])
287}
288
289fn render_streaming_distribution(d: &StoredStreamingDistribution) -> String {
290    let mut out = String::with_capacity(512);
291    out.push_str(XML_DECL);
292    out.push_str(&format!("<StreamingDistribution xmlns=\"{NS}\">"));
293    out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
294    out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
295    out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
296    out.push_str(&format!(
297        "<LastModifiedTime>{}</LastModifiedTime>",
298        rfc3339(&d.last_modified_time)
299    ));
300    out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
301    out.push_str(&render_active_trusted_signers(&d.config));
302    out.push_str(&serialize_config_inner(&d.config));
303    out.push_str("</StreamingDistribution>");
304    out
305}
306
307fn render_streaming_distribution_config(cfg: &StreamingDistributionConfig) -> String {
308    let mut out = String::with_capacity(512);
309    out.push_str(XML_DECL);
310    out.push_str(&format!("<StreamingDistributionConfig xmlns=\"{NS}\">"));
311    out.push_str(&serialize_config_body(cfg));
312    out.push_str("</StreamingDistributionConfig>");
313    out
314}
315
316fn push_summary_inner(out: &mut String, d: &StoredStreamingDistribution) {
317    out.push_str(&format!("<Id>{}</Id>", esc(&d.id)));
318    out.push_str(&format!("<ARN>{}</ARN>", esc(&d.arn)));
319    out.push_str(&format!("<Status>{}</Status>", esc(&d.status)));
320    out.push_str(&format!(
321        "<LastModifiedTime>{}</LastModifiedTime>",
322        rfc3339(&d.last_modified_time)
323    ));
324    out.push_str(&format!("<DomainName>{}</DomainName>", esc(&d.domain_name)));
325    // S3Origin
326    out.push_str("<S3Origin>");
327    out.push_str(&format!(
328        "<DomainName>{}</DomainName>",
329        esc(&d.config.s3_origin.domain_name)
330    ));
331    out.push_str(&format!(
332        "<OriginAccessIdentity>{}</OriginAccessIdentity>",
333        esc(&d.config.s3_origin.origin_access_identity)
334    ));
335    out.push_str("</S3Origin>");
336    out.push_str(&render_aliases(&d.config));
337    out.push_str(&render_trusted_signers(&d.config));
338    out.push_str(&format!("<Comment>{}</Comment>", esc(&d.config.comment)));
339    out.push_str(&format!(
340        "<PriceClass>{}</PriceClass>",
341        esc(&d.config.price_class)
342    ));
343    out.push_str(&format!("<Enabled>{}</Enabled>", d.config.enabled));
344}
345
346fn serialize_config_inner(cfg: &StreamingDistributionConfig) -> String {
347    let mut out = String::with_capacity(512);
348    out.push_str("<StreamingDistributionConfig>");
349    out.push_str(&serialize_config_body(cfg));
350    out.push_str("</StreamingDistributionConfig>");
351    out
352}
353
354fn serialize_config_body(cfg: &StreamingDistributionConfig) -> String {
355    let mut out = String::with_capacity(512);
356    out.push_str(&format!(
357        "<CallerReference>{}</CallerReference>",
358        esc(&cfg.caller_reference)
359    ));
360    out.push_str("<S3Origin>");
361    out.push_str(&format!(
362        "<DomainName>{}</DomainName>",
363        esc(&cfg.s3_origin.domain_name)
364    ));
365    out.push_str(&format!(
366        "<OriginAccessIdentity>{}</OriginAccessIdentity>",
367        esc(&cfg.s3_origin.origin_access_identity)
368    ));
369    out.push_str("</S3Origin>");
370    out.push_str(&render_aliases(cfg));
371    out.push_str(&format!("<Comment>{}</Comment>", esc(&cfg.comment)));
372    if let Some(log) = &cfg.logging {
373        out.push_str("<Logging>");
374        out.push_str(&format!("<Enabled>{}</Enabled>", log.enabled));
375        out.push_str(&format!("<Bucket>{}</Bucket>", esc(&log.bucket)));
376        out.push_str(&format!("<Prefix>{}</Prefix>", esc(&log.prefix)));
377        out.push_str("</Logging>");
378    }
379    out.push_str(&render_trusted_signers(cfg));
380    out.push_str(&format!(
381        "<PriceClass>{}</PriceClass>",
382        esc(&cfg.price_class)
383    ));
384    out.push_str(&format!("<Enabled>{}</Enabled>", cfg.enabled));
385    out
386}
387
388fn render_aliases(cfg: &StreamingDistributionConfig) -> String {
389    let mut out = String::with_capacity(64);
390    let (qty, items) = match &cfg.aliases {
391        Some(a) => (a.quantity, a.items.clone()),
392        None => (0, None),
393    };
394    out.push_str("<Aliases>");
395    out.push_str(&format!("<Quantity>{}</Quantity>", qty));
396    if let Some(it) = items {
397        out.push_str("<Items>");
398        for c in &it.cname {
399            out.push_str(&format!("<CNAME>{}</CNAME>", esc(c)));
400        }
401        out.push_str("</Items>");
402    }
403    out.push_str("</Aliases>");
404    out
405}
406
407fn render_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
408    let mut out = String::with_capacity(64);
409    out.push_str("<TrustedSigners>");
410    out.push_str(&format!(
411        "<Enabled>{}</Enabled>",
412        cfg.trusted_signers.enabled
413    ));
414    out.push_str(&format!(
415        "<Quantity>{}</Quantity>",
416        cfg.trusted_signers.quantity
417    ));
418    if let Some(it) = &cfg.trusted_signers.items {
419        out.push_str("<Items>");
420        for a in &it.aws_account_number {
421            out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
422        }
423        out.push_str("</Items>");
424    }
425    out.push_str("</TrustedSigners>");
426    out
427}
428
429fn render_active_trusted_signers(cfg: &StreamingDistributionConfig) -> String {
430    let mut out = String::with_capacity(64);
431    out.push_str("<ActiveTrustedSigners>");
432    out.push_str(&format!(
433        "<Enabled>{}</Enabled>",
434        cfg.trusted_signers.enabled
435    ));
436    out.push_str(&format!(
437        "<Quantity>{}</Quantity>",
438        cfg.trusted_signers.quantity
439    ));
440    if let Some(it) = &cfg.trusted_signers.items {
441        out.push_str("<Items>");
442        for a in &it.aws_account_number {
443            out.push_str("<Signer>");
444            out.push_str(&format!("<AwsAccountNumber>{}</AwsAccountNumber>", esc(a)));
445            out.push_str("</Signer>");
446        }
447        out.push_str("</Items>");
448    }
449    out.push_str("</ActiveTrustedSigners>");
450    out
451}
452
453/// Compare two `StreamingDistributionConfig`s by their canonical XML
454/// representation. Mirrors `service::configs_equal` for the streaming
455/// (RTMP) distribution variant; used so a no-op `UpdateStreamingDistribution`
456/// leaves the ETag stable.
457fn streaming_configs_equal(
458    lhs: &StreamingDistributionConfig,
459    rhs: &StreamingDistributionConfig,
460) -> bool {
461    let Ok(a) = xml_io::to_xml_root("StreamingDistributionConfig", lhs) else {
462        return false;
463    };
464    let Ok(b) = xml_io::to_xml_root("StreamingDistributionConfig", rhs) else {
465        return false;
466    };
467    a == b
468}