1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//! `AWS::Pipes::Pipe` CloudFormation provisioning. Creates a pipe as a real
//! record in the `pipes` service state — the same JSON shape the direct
//! `CreatePipe` handler stores — so the background Pipes runner picks it up and
//! a CFN-created pipe reads back identically to an API-created one on
//! `DescribePipe`. The pipe is created already settled (`RUNNING`, or `STOPPED`
//! when `DesiredState=STOPPED`) since CFN provisioning is synchronous. Persists
//! through the `pipes` snapshot hook (keyed off the `Pipes` resource segment),
//! so the resource survives a restart (the #1766 lesson).
use serde_json::{json, Map, Value};
use super::{ProvisionResult, ResourceDefinition, ResourceProvisioner};
/// The optional `CreatePipe` input fields, copied verbatim from the CFN
/// properties (Pipes uses PascalCase in both the API and CloudFormation, so the
/// keys map 1:1).
const PIPE_INPUT_FIELDS: &[&str] = &[
"Description",
"Source",
"SourceParameters",
"Enrichment",
"EnrichmentParameters",
"Target",
"TargetParameters",
"RoleArn",
"LogConfiguration",
"KmsKeyIdentifier",
];
/// The updatable `UpdatePipe` fields. `Source` is intentionally absent: it is
/// immutable on a pipe (a change forces a CloudFormation replacement), so an
/// in-place update never rewrites it. Mirrors the direct `UpdatePipe` handler.
const PIPE_UPDATE_FIELDS: &[&str] = &[
"Description",
"SourceParameters",
"Enrichment",
"EnrichmentParameters",
"Target",
"TargetParameters",
"RoleArn",
"LogConfiguration",
"KmsKeyIdentifier",
];
impl ResourceProvisioner {
pub(super) fn create_pipes_pipe(
&self,
resource: &ResourceDefinition,
) -> Result<ProvisionResult, String> {
let props = &resource.properties;
let name = props
.get("Name")
.and_then(Value::as_str)
.map(String::from)
.unwrap_or_else(|| resource.logical_id.clone());
// Apply the same validation the direct CreatePipe handler enforces, so a
// CFN-created pipe can't slip past name/ARN constraints the API rejects.
fakecloud_pipes::validate_pipe_name(&name).map_err(|e| e.message())?;
// Source/Target/RoleArn are required and must be non-empty (a bare
// `as_str` would accept `""`), and are bounded to the 1-1600 ARN length.
let source = require_arn_field(props, "Source")?;
let target = require_arn_field(props, "Target")?;
let role_arn = require_arn_field(props, "RoleArn")?;
let arn = format!(
"arn:aws:pipes:{}:{}:pipe/{name}",
self.region, self.account_id
);
// CFN provisioning is synchronous, so the pipe is born settled: RUNNING
// unless the template explicitly asks for STOPPED.
let desired = match props.get("DesiredState").and_then(Value::as_str) {
Some("STOPPED") => "STOPPED",
_ => "RUNNING",
};
let now = epoch_secs();
// Reject a same-name collision instead of silently overwriting an
// existing pipe (the direct CreatePipe handler returns ConflictException
// here). CloudFormation provisions a stack single-threaded, so a plain
// read-then-write is race-free.
if self
.pipes_state
.read()
.get(&self.account_id)
.is_some_and(|st| st.pipes.contains_key(&name))
{
return Err(format!("Pipe with Name {name} already exists."));
}
let mut pipe = Map::new();
pipe.insert("Name".into(), json!(name));
pipe.insert("Arn".into(), json!(arn));
pipe.insert("Source".into(), json!(source));
pipe.insert("Target".into(), json!(target));
pipe.insert("RoleArn".into(), json!(role_arn));
for field in PIPE_INPUT_FIELDS {
if let Some(v) = props.get(*field) {
pipe.insert((*field).into(), v.clone());
}
}
pipe.insert("DesiredState".into(), json!(desired));
pipe.insert("CurrentState".into(), json!(desired));
pipe.insert("StateReason".into(), json!("Pipe is healthy"));
pipe.insert("CreationTime".into(), json!(now));
pipe.insert("LastModifiedTime".into(), json!(now));
// Echo the source-typed default parameter block (e.g. SqsQueueParameters)
// just like the direct CreatePipe handler, so a CFN-created pipe reads
// back identically on DescribePipe.
fakecloud_pipes::ensure_source_param_defaults(&mut pipe, &source);
fakecloud_pipes::normalize_empty_input_templates(&mut pipe);
// Tags: AWS::Pipes::Pipe carries a JSON map; mirror the direct handler,
// which both embeds the Tags map on the pipe and indexes it in the tag
// store for ListTagsForResource.
if let Some(tags) = props.get("Tags").and_then(Value::as_object) {
let owned: Map<String, Value> = tags
.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), json!(s))))
.collect();
if !owned.is_empty() {
pipe.insert("Tags".into(), Value::Object(owned.clone()));
let mut state = self.pipes_state.write();
let entry = state
.get_or_create(&self.account_id)
.tags
.entry(arn.clone())
.or_default();
for (k, v) in owned {
if let Some(s) = v.as_str() {
entry.insert(k, s.to_string());
}
}
}
}
self.pipes_state
.write()
.get_or_create(&self.account_id)
.pipes
.insert(name.clone(), Value::Object(pipe));
// Ref returns the pipe name; GetAtt exposes Arn + the lifecycle fields.
Ok(ProvisionResult::new(name)
.with("Arn", arn)
.with("CurrentState", desired)
.with("StateReason", "Pipe is healthy")
.with("CreationTime", now.to_string())
.with("LastModifiedTime", now.to_string()))
}
/// Apply a stack UpdateStack change to an existing pipe: re-write the
/// updatable fields (full replace — an omitted field is cleared, matching
/// the direct `UpdatePipe` handler) and re-settle it. Without this arm the
/// generic `update_resource` dispatcher returns `Ok(None)` for
/// `AWS::Pipes::Pipe`, so CloudFormation reports `UPDATE_COMPLETE` while
/// silently discarding the property change.
///
/// `Source` is replacement-required on `AWS::Pipes::Pipe` (the CFN resource
/// schema marks Source create-only: "Update requires: Replacement"). A
/// changed Source therefore cannot be applied in place — doing so would leave
/// the pipe reading from the old source while reporting success. When Source
/// changes we perform a replacement (delete the old record + recreate from
/// the new properties) so the new Source actually takes effect.
pub(super) fn update_pipes_pipe(
&self,
existing: &super::StackResource,
resource: &ResourceDefinition,
) -> Result<ProvisionResult, String> {
let props = &resource.properties;
// The physical id of a pipe is its Name.
let name = existing.physical_id.clone();
let now = epoch_secs();
// Replacement-required Source change: recreate instead of in-place update.
let old_source = self
.pipes_state
.read()
.get(&self.account_id)
.and_then(|st| st.pipes.get(&name))
.and_then(|p| p.get("Source").and_then(Value::as_str).map(String::from));
let new_source = props.get("Source").and_then(Value::as_str);
if let (Some(old), Some(new)) = (old_source.as_deref(), new_source) {
if new != old {
// Drop the old record so the recreate's same-name conflict check
// passes, then rebuild the pipe from scratch with the new Source.
self.delete_pipes_pipe(&name);
return self.create_pipes_pipe(resource);
}
}
// Target/RoleArn stay required and non-empty on update; Source is
// unchanged here (a change took the replacement path above). Called for
// validation only — the applied value flows through PIPE_UPDATE_FIELDS.
require_arn_field(props, "Target")?;
require_arn_field(props, "RoleArn")?;
let desired = match props.get("DesiredState").and_then(Value::as_str) {
Some("STOPPED") => "STOPPED",
_ => "RUNNING",
};
let mut state = self.pipes_state.write();
let acct = state.get_or_create(&self.account_id);
let pipe = acct
.pipes
.get_mut(&name)
.ok_or_else(|| format!("AWS::Pipes::Pipe {name} not yet provisioned"))?;
let obj = pipe
.as_object_mut()
.ok_or_else(|| format!("corrupt pipe state for {name}"))?;
for field in PIPE_UPDATE_FIELDS {
match props.get(*field) {
Some(v) => {
obj.insert((*field).to_string(), v.clone());
}
None => {
obj.remove(*field);
}
}
}
obj.insert("DesiredState".into(), json!(desired));
// CFN provisioning is synchronous, so the pipe stays settled.
obj.insert("CurrentState".into(), json!(desired));
obj.insert("StateReason".into(), json!("Pipe is healthy"));
obj.insert("LastModifiedTime".into(), json!(now));
let source = obj
.get("Source")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
fakecloud_pipes::ensure_source_param_defaults(obj, &source);
fakecloud_pipes::normalize_empty_input_templates(obj);
let arn = obj
.get("Arn")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
Ok(ProvisionResult::new(name)
.with("Arn", arn)
.with("CurrentState", desired)
.with("StateReason", "Pipe is healthy")
.with("LastModifiedTime", now.to_string()))
}
/// Live-state `Fn::GetAtt` for a pipe: resolves `Arn` from the pipes service
/// state (the create path also pre-captures it, so this is the consistency
/// overlay used when reading back a persisted stack).
pub(super) fn get_att_pipes_pipe(&self, physical_id: &str, attribute: &str) -> Option<String> {
let state = self.pipes_state.read();
let pipe = state.get(&self.account_id)?.pipes.get(physical_id)?;
match attribute {
"Arn" => pipe.get("Arn").and_then(Value::as_str).map(String::from),
_ => None,
}
}
/// Delete a pipe by physical id (its name). Also drops the tag-store entry
/// keyed by the pipe's ARN.
pub(super) fn delete_pipes_pipe(&self, physical_id: &str) {
let mut state = self.pipes_state.write();
let acct = state.get_or_create(&self.account_id);
if let Some(removed) = acct.pipes.remove(physical_id) {
if let Some(arn) = removed.get("Arn").and_then(Value::as_str) {
acct.tags.remove(arn);
}
}
}
}
/// Extract a required ARN-shaped property (`Source`/`Target`/`RoleArn`),
/// rejecting a missing, empty, or oversize value with the same non-empty +
/// 1-1600 length bound the direct CreatePipe/UpdatePipe handler enforces.
fn require_arn_field(props: &Value, field: &str) -> Result<String, String> {
let value = props
.get(field)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.ok_or_else(|| format!("AWS::Pipes::Pipe requires a non-empty {field}"))?;
fakecloud_pipes::validate_resource_arn_len(field, value).map_err(|e| e.message())?;
Ok(value.to_string())
}
/// Seconds since the Unix epoch. CloudFormation provisioning is not on the
/// hot path, so a direct `SystemTime` read is fine here.
fn epoch_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}