use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use base64::Engine;
use chrono::{DateTime, Utc};
use serde_json::{json, Value};
use fakecloud_core::delivery::{DeliveryBus, KmsHook};
use fakecloud_dynamodb::SharedDynamoDbState;
use fakecloud_kinesis::SharedKinesisState;
use fakecloud_lambda::filter::FilterSet;
use fakecloud_persistence::SnapshotHook;
use fakecloud_pipes::SharedPipesState;
use fakecloud_sqs::SharedSqsState;
enum SourceKind {
Sqs,
Kinesis,
DynamoDbStream,
}
struct RunningPipe {
arn: String,
source_arn: String,
source_kind: SourceKind,
target_arn: String,
target_params: Option<Value>,
input_template: Option<String>,
enrichment: Option<String>,
enrichment_input_template: Option<String>,
filter: FilterSet,
batch_size: usize,
starting_position: Option<String>,
starting_position_timestamp: Option<f64>,
}
pub struct PipesRunner {
pipes_state: SharedPipesState,
sqs_state: SharedSqsState,
kinesis_state: Option<SharedKinesisState>,
dynamodb_state: Option<SharedDynamoDbState>,
delivery: Arc<DeliveryBus>,
kms_hook: Option<Arc<dyn KmsHook>>,
persist_hook: Option<SnapshotHook>,
checkpoints_dirty: Arc<AtomicBool>,
}
impl PipesRunner {
pub fn new(
pipes_state: SharedPipesState,
sqs_state: SharedSqsState,
delivery: Arc<DeliveryBus>,
) -> Self {
Self {
pipes_state,
sqs_state,
kinesis_state: None,
dynamodb_state: None,
delivery,
kms_hook: None,
persist_hook: None,
checkpoints_dirty: Arc::new(AtomicBool::new(false)),
}
}
pub fn with_persist_hook(mut self, hook: SnapshotHook) -> Self {
self.persist_hook = Some(hook);
self
}
pub fn with_kinesis_state(mut self, state: SharedKinesisState) -> Self {
self.kinesis_state = Some(state);
self
}
pub fn with_dynamodb_state(mut self, state: SharedDynamoDbState) -> Self {
self.dynamodb_state = Some(state);
self
}
pub fn with_kms_hook(mut self, hook: Arc<dyn KmsHook>) -> Self {
self.kms_hook = Some(hook);
self
}
pub async fn run(self) {
let this = Arc::new(self);
loop {
this.poll().await;
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn poll(self: &Arc<Self>) {
if fakecloud_pipes::drain_overdue_transient_pipes(&self.pipes_state, 1.0) > 0 {
self.checkpoints_dirty.store(true, Ordering::Relaxed);
}
let mut set = tokio::task::JoinSet::new();
for pipe in self.collect_running_pipes() {
let me = Arc::clone(self);
set.spawn(async move {
match pipe.source_kind {
SourceKind::Sqs => me.process_sqs_pipe(&pipe).await,
SourceKind::Kinesis => me.process_kinesis_pipe(&pipe).await,
SourceKind::DynamoDbStream => me.process_ddb_pipe(&pipe).await,
}
});
}
while set.join_next().await.is_some() {}
if self.checkpoints_dirty.swap(false, Ordering::Relaxed) {
if let Some(hook) = &self.persist_hook {
hook().await;
}
}
}
fn collect_running_pipes(&self) -> Vec<RunningPipe> {
let accounts = self.pipes_state.read();
let mut out = Vec::new();
for state in accounts.accounts.values() {
for pipe in state.pipes.values() {
if pipe.get("CurrentState").and_then(Value::as_str) != Some("RUNNING") {
continue;
}
let Some(source_arn) = pipe.get("Source").and_then(Value::as_str) else {
continue;
};
let source_kind = if source_arn.contains(":sqs:") {
SourceKind::Sqs
} else if source_arn.contains(":kinesis:") {
SourceKind::Kinesis
} else if source_arn.contains(":dynamodb:") && source_arn.contains("/stream/") {
SourceKind::DynamoDbStream
} else {
continue;
};
let Some(target_arn) = pipe.get("Target").and_then(Value::as_str) else {
continue;
};
let source_params = pipe.get("SourceParameters");
let filter = build_filter(source_params);
let batch_size = source_batch_size(source_params, &source_kind);
let (starting_position, starting_position_timestamp) =
starting_position(source_params, &source_kind);
let target_params = pipe.get("TargetParameters").cloned();
let input_template = target_params
.as_ref()
.and_then(|p| p.get("InputTemplate"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_string);
let enrichment = pipe
.get("Enrichment")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_string);
let enrichment_input_template = pipe
.get("EnrichmentParameters")
.and_then(|p| p.get("InputTemplate"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_string);
let arn = pipe
.get("Arn")
.and_then(Value::as_str)
.unwrap_or(source_arn)
.to_string();
out.push(RunningPipe {
arn,
source_arn: source_arn.to_string(),
source_kind,
target_arn: target_arn.to_string(),
target_params,
input_template,
enrichment,
enrichment_input_template,
filter,
batch_size,
starting_position,
starting_position_timestamp,
});
}
}
out
}
async fn process_sqs_pipe(&self, pipe: &RunningPipe) {
let now = Utc::now();
if !self.sqs_source_has_visible_messages(&pipe.source_arn, now) {
return;
}
let picked = self.pick_messages(&pipe.source_arn, pipe.batch_size, now);
if picked.is_empty() {
return;
}
let mut matched: Vec<(String, Value)> = Vec::new();
let mut ack_ids: Vec<String> = Vec::new();
for (id, event) in picked {
if pipe.filter.is_empty() || pipe.filter.matches(&event) {
matched.push((id, event));
} else {
ack_ids.push(id);
}
}
if !matched.is_empty() {
let events: Vec<Value> = matched.iter().map(|(_, e)| e.clone()).collect();
if self.enrich_and_deliver(pipe, events).await {
ack_ids.extend(matched.into_iter().map(|(id, _)| id));
}
}
if !ack_ids.is_empty() {
self.delete_messages(&pipe.source_arn, &ack_ids);
}
}
fn sqs_source_has_visible_messages(&self, source_arn: &str, now: DateTime<Utc>) -> bool {
let sqs_mas = self.sqs_state.read();
let acct = source_arn.split(':').nth(4).unwrap_or("");
let Some(sqs) = sqs_mas.get(acct) else {
return false;
};
let Some(queue) = sqs.queues.values().find(|q| q.arn == source_arn) else {
return false;
};
queue
.messages
.iter()
.any(|m| m.visible_at.map(|v| v <= now).unwrap_or(true))
}
fn pick_messages(
&self,
source_arn: &str,
limit: usize,
now: DateTime<Utc>,
) -> Vec<(String, Value)> {
let mut sqs_mas = self.sqs_state.write();
let default_acct = sqs_mas.default_account_id().to_string();
let acct = source_arn
.split(':')
.nth(4)
.unwrap_or(&default_acct)
.to_string();
let region = source_arn
.split(':')
.nth(3)
.unwrap_or("us-east-1")
.to_string();
let sqs = sqs_mas.get_or_create(&acct);
let Some(queue) = sqs.queues.values_mut().find(|q| q.arn == source_arn) else {
return Vec::new();
};
let visibility_timeout: i64 = queue
.attributes
.get("VisibilityTimeout")
.and_then(|s| s.parse().ok())
.unwrap_or(30);
let visible_at = now + chrono::Duration::seconds(visibility_timeout);
let encrypted = queue
.attributes
.get("KmsMasterKeyId")
.map(|k| !k.is_empty())
.unwrap_or(false)
|| queue
.attributes
.get("SqsManagedSseEnabled")
.map(String::as_str)
== Some("true");
let mut out = Vec::new();
for msg in queue.messages.iter_mut() {
if out.len() >= limit {
break;
}
if let Some(vis) = msg.visible_at {
if vis > now {
continue;
}
}
msg.visible_at = Some(visible_at);
let body = self.decrypt_body(&msg.body, source_arn, &acct, encrypted);
out.push((
msg.message_id.clone(),
sqs_source_event(msg, &body, source_arn, ®ion),
));
}
out
}
fn decrypt_body(&self, body: &str, source_arn: &str, account: &str, encrypted: bool) -> String {
if !encrypted || !looks_like_fakecloud_envelope(body) {
return body.to_string();
}
let Some(hook) = self.kms_hook.as_ref() else {
return body.to_string();
};
let mut ctx = HashMap::new();
ctx.insert("aws:sqs:arn".to_string(), source_arn.to_string());
match hook.decrypt(account, body, "sqs.amazonaws.com", ctx) {
Ok(bytes) => String::from_utf8_lossy(&bytes).to_string(),
Err(err) => {
tracing::warn!(%source_arn, %err, "pipes: KMS decrypt failed; forwarding opaque body");
body.to_string()
}
}
}
fn delete_messages(&self, source_arn: &str, ids: &[String]) {
let mut sqs_mas = self.sqs_state.write();
let default_acct = sqs_mas.default_account_id().to_string();
let acct = source_arn
.split(':')
.nth(4)
.unwrap_or(&default_acct)
.to_string();
let sqs = sqs_mas.get_or_create(&acct);
if let Some(queue) = sqs.queues.values_mut().find(|q| q.arn == source_arn) {
queue.messages.retain(|m| !ids.contains(&m.message_id));
}
}
async fn process_kinesis_pipe(&self, pipe: &RunningPipe) {
let Some(kinesis_state) = self.kinesis_state.as_ref() else {
return;
};
let account = arn_account(&pipe.source_arn);
let region = arn_region(&pipe.source_arn);
struct ShardWindow {
shard_id: String,
last_seq: String,
records: Vec<fakecloud_kinesis::KinesisRecord>,
}
let windows: Vec<ShardWindow> = {
let ks = kinesis_state.read();
let Some(kinesis) = ks.get(&account) else {
return;
};
let Some(stream) = kinesis
.streams
.values()
.find(|s| s.stream_arn == pipe.source_arn)
else {
return;
};
let mut seeds: Vec<(String, String)> = Vec::new();
let mut windows = Vec::new();
for shard in &stream.shards {
let key = format!("{}#{}", pipe.arn, shard.shard_id);
let checkpoint = match self.checkpoint(&account, &key) {
Some(cp) => cp,
None => {
let init = kinesis_start_index(
&shard.records,
pipe.starting_position.as_deref(),
pipe.starting_position_timestamp,
);
let seed = if init == 0 {
String::new()
} else {
shard.records[init - 1].sequence_number.clone()
};
seeds.push((key.clone(), seed.clone()));
seed
}
};
let start = kinesis_window_start(&shard.records, &checkpoint);
if start >= shard.records.len() {
continue;
}
let end = shard
.records
.len()
.min(start.saturating_add(pipe.batch_size));
windows.push(ShardWindow {
shard_id: shard.shard_id.clone(),
last_seq: shard.records[end - 1].sequence_number.clone(),
records: shard.records[start..end].to_vec(),
});
}
drop(ks);
for (key, seed) in seeds {
self.set_checkpoint(&account, &key, seed);
}
windows
};
for window in windows {
let events: Vec<Value> = window
.records
.iter()
.map(|r| kinesis_source_event(r, &window.shard_id, &pipe.source_arn, ®ion))
.collect();
let key = format!("{}#{}", pipe.arn, window.shard_id);
if self.process_stream_window(pipe, events).await {
self.set_checkpoint(&account, &key, window.last_seq);
}
}
}
async fn process_ddb_pipe(&self, pipe: &RunningPipe) {
let Some(dynamodb_state) = self.dynamodb_state.as_ref() else {
return;
};
let account = arn_account(&pipe.source_arn);
let Some(table_name) = pipe
.source_arn
.split(":table/")
.nth(1)
.and_then(|t| t.split('/').next())
.map(str::to_string)
else {
return;
};
if self.checkpoint(&account, &pipe.arn).is_none() {
let init = {
let ds = dynamodb_state.read();
let Some(dynamodb) = ds.get(&account) else {
return;
};
let Some(table) = dynamodb.tables.get(&table_name) else {
return;
};
if pipe.starting_position.as_deref() == Some("LATEST") {
let records = table.stream_records.read();
records
.iter()
.map(|r| r.dynamodb.sequence_number.clone())
.max()
.unwrap_or_default()
} else {
String::new()
}
};
self.set_checkpoint(&account, &pipe.arn, init);
}
let checkpoint = self.checkpoint(&account, &pipe.arn).unwrap_or_default();
let (last_seq, events) = {
let ds = dynamodb_state.read();
let Some(dynamodb) = ds.get(&account) else {
return;
};
let Some(table) = dynamodb.tables.get(&table_name) else {
return;
};
if !table.stream_enabled {
return;
}
let stream_records = table.stream_records.read();
let mut window: Vec<_> = stream_records
.iter()
.filter(|r| {
checkpoint.is_empty()
|| r.dynamodb.sequence_number.as_str() > checkpoint.as_str()
})
.take(pipe.batch_size)
.cloned()
.collect();
window.sort_by(|a, b| a.dynamodb.sequence_number.cmp(&b.dynamodb.sequence_number));
let last_seq = window.last().map(|r| r.dynamodb.sequence_number.clone());
let events: Vec<Value> = window.iter().map(ddb_source_event).collect();
(last_seq, events)
};
let Some(last_seq) = last_seq else {
return;
};
if self.process_stream_window(pipe, events).await {
self.set_checkpoint(&account, &pipe.arn, last_seq);
}
}
async fn process_stream_window(&self, pipe: &RunningPipe, events: Vec<Value>) -> bool {
let matched: Vec<Value> = if pipe.filter.is_empty() {
events
} else {
events
.into_iter()
.filter(|e| pipe.filter.matches(e))
.collect()
};
if matched.is_empty() {
return true;
}
self.enrich_and_deliver(pipe, matched).await
}
async fn enrich_and_deliver(&self, pipe: &RunningPipe, events: Vec<Value>) -> bool {
let enriched = match self.enrich(pipe, events).await {
Some(events) => events,
None => return false,
};
if enriched.is_empty() {
return true;
}
let transformed: Vec<Value> = enriched
.iter()
.map(|e| apply_input_template(pipe.input_template.as_deref(), e))
.collect();
self.deliver(&pipe.target_arn, pipe.target_params.as_ref(), &transformed)
.await
}
async fn enrich(&self, pipe: &RunningPipe, events: Vec<Value>) -> Option<Vec<Value>> {
let Some(enr_arn) = pipe.enrichment.as_deref() else {
return Some(events);
};
let input: Vec<Value> = events
.iter()
.map(|e| apply_input_template(pipe.enrichment_input_template.as_deref(), e))
.collect();
if enr_arn.contains(":lambda:") {
let payload =
serde_json::to_string(&Value::Array(input)).unwrap_or_else(|_| "[]".into());
match self.delivery.invoke_lambda(enr_arn, &payload).await {
Some(Ok(bytes)) => Some(enrichment_output(&bytes)),
Some(Err(err)) => {
tracing::warn!(%enr_arn, %err, "pipes: enrichment Lambda failed; events will be retried");
None
}
None => None,
}
} else {
tracing::warn!(%enr_arn, "pipes: unsupported enrichment type; events held");
None
}
}
async fn deliver(
&self,
target_arn: &str,
target_params: Option<&Value>,
batch: &[Value],
) -> bool {
if target_arn.contains(":lambda:") {
let payload = serde_json::to_string(&Value::Array(batch.to_vec()))
.unwrap_or_else(|_| "[]".to_string());
match self.delivery.invoke_lambda(target_arn, &payload).await {
Some(Ok(_)) => true,
Some(Err(err)) => {
tracing::warn!(%target_arn, %err, "pipes: Lambda target invocation failed; events will be retried");
false
}
None => false,
}
} else if target_arn.contains(":sqs:") {
for event in batch {
let body = event_to_payload(event);
self.delivery
.send_to_sqs(target_arn, &body, &HashMap::new());
}
true
} else if target_arn.contains(":sns:") {
for event in batch {
let body = event_to_payload(event);
self.delivery.publish_to_sns(target_arn, &body, None);
}
true
} else if target_arn.contains(":states:") {
for event in batch {
let input = event_to_payload(event);
self.delivery
.start_stepfunctions_execution(target_arn, &input);
}
true
} else if target_arn.contains(":events:") {
let bus_name = target_arn
.rsplit_once("event-bus/")
.map(|(_, n)| n)
.unwrap_or("default");
let eb_params = target_params.and_then(|p| p.get("EventBridgeEventBusParameters"));
let source = eb_params
.and_then(|p| p.get("Source"))
.and_then(Value::as_str)
.unwrap_or("Pipes");
let detail_type = eb_params
.and_then(|p| p.get("DetailType"))
.and_then(Value::as_str)
.unwrap_or("Event");
for event in batch {
let detail = event_to_payload(event);
self.delivery
.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
}
true
} else if target_arn.contains(":kinesis:") {
let configured_pk = target_params
.and_then(|p| p.get("KinesisStreamParameters"))
.and_then(|p| p.get("PartitionKey"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty());
for (idx, event) in batch.iter().enumerate() {
let data = event_to_payload(event);
let fallback = idx.to_string();
let pk = configured_pk.unwrap_or(fallback.as_str());
self.delivery.send_to_kinesis(target_arn, &data, pk);
}
true
} else {
false
}
}
fn checkpoint(&self, account: &str, key: &str) -> Option<String> {
self.pipes_state
.read()
.get(account)
.and_then(|s| s.source_checkpoints.get(key).cloned())
}
fn set_checkpoint(&self, account: &str, key: &str, value: String) {
self.pipes_state
.write()
.get_or_create(account)
.source_checkpoints
.insert(key.to_string(), value);
self.checkpoints_dirty.store(true, Ordering::Relaxed);
}
}
fn build_filter(source_params: Option<&Value>) -> FilterSet {
let patterns: Vec<String> = source_params
.and_then(|p| p.get("FilterCriteria"))
.and_then(|f| f.get("Filters"))
.and_then(Value::as_array)
.map(|filters| {
filters
.iter()
.filter_map(|f| f.get("Pattern").and_then(Value::as_str))
.map(|s| s.to_string())
.collect()
})
.unwrap_or_default();
FilterSet::from_strings(patterns)
}
fn source_batch_size(source_params: Option<&Value>, kind: &SourceKind) -> usize {
let (param_key, default, max) = match kind {
SourceKind::Sqs => ("SqsQueueParameters", 10, 10),
SourceKind::Kinesis => ("KinesisStreamParameters", 100, 10_000),
SourceKind::DynamoDbStream => ("DynamoDBStreamParameters", 100, 10_000),
};
source_params
.and_then(|p| p.get(param_key))
.and_then(|p| p.get("BatchSize"))
.and_then(Value::as_i64)
.filter(|n| *n > 0)
.unwrap_or(default)
.min(max) as usize
}
fn starting_position(
source_params: Option<&Value>,
kind: &SourceKind,
) -> (Option<String>, Option<f64>) {
let param_key = match kind {
SourceKind::Kinesis => "KinesisStreamParameters",
SourceKind::DynamoDbStream => "DynamoDBStreamParameters",
SourceKind::Sqs => return (None, None),
};
let params = source_params.and_then(|p| p.get(param_key));
let pos = params
.and_then(|p| p.get("StartingPosition"))
.and_then(Value::as_str)
.map(str::to_string);
let ts = params
.and_then(|p| p.get("StartingPositionTimestamp"))
.and_then(Value::as_f64);
(pos, ts)
}
fn kinesis_window_start(records: &[fakecloud_kinesis::KinesisRecord], checkpoint: &str) -> usize {
records.partition_point(|r| r.sequence_number.as_str() <= checkpoint)
}
fn kinesis_start_index(
records: &[fakecloud_kinesis::KinesisRecord],
starting_position: Option<&str>,
starting_position_timestamp: Option<f64>,
) -> usize {
match starting_position.unwrap_or("TRIM_HORIZON") {
"LATEST" => records.len(),
"AT_TIMESTAMP" => {
let target = starting_position_timestamp.map(|t| t as i64).unwrap_or(0);
records
.iter()
.position(|r| r.approximate_arrival_timestamp.timestamp() >= target)
.unwrap_or(records.len())
}
_ => 0, }
}
fn arn_account(arn: &str) -> String {
arn.split(':').nth(4).unwrap_or("").to_string()
}
fn arn_region(arn: &str) -> String {
arn.split(':').nth(3).unwrap_or("us-east-1").to_string()
}
fn event_to_payload(event: &Value) -> String {
match event {
Value::String(s) => s.clone(),
other => serde_json::to_string(other).unwrap_or_default(),
}
}
fn enrichment_output(bytes: &[u8]) -> Vec<Value> {
match serde_json::from_slice::<Value>(bytes) {
Ok(Value::Array(items)) => items,
Ok(Value::Null) => Vec::new(),
Ok(Value::String(s)) if s.is_empty() => Vec::new(),
Ok(other) => vec![other],
Err(_) => Vec::new(),
}
}
fn apply_input_template(template: Option<&str>, event: &Value) -> Value {
let Some(template) = template else {
return event.clone();
};
let rendered = render_template(template, event);
match serde_json::from_str::<Value>(&rendered) {
Ok(value) => value,
Err(_) => Value::String(rendered),
}
}
fn render_template(template: &str, event: &Value) -> String {
let mut out = String::new();
let mut rest = template;
while let Some(open) = rest.find('<') {
out.push_str(&rest[..open]);
let after = &rest[open + 1..];
if let Some(close) = after.find('>') {
out.push_str(&resolve_token(after[..close].trim(), event));
rest = &after[close + 1..];
} else {
out.push('<');
rest = after;
}
}
out.push_str(rest);
out
}
fn resolve_token(token: &str, event: &Value) -> String {
if token == "aws.pipes.event.json" {
return serde_json::to_string(event).unwrap_or_default();
}
if let Some(path) = token.strip_prefix('$') {
return match resolve_json_path(event, path) {
Some(Value::String(s)) => s,
Some(other) => serde_json::to_string(&other).unwrap_or_default(),
None => String::new(),
};
}
format!("<{token}>")
}
fn resolve_json_path(event: &Value, path: &str) -> Option<Value> {
let mut current = event;
for segment in path.split('.') {
if segment.is_empty() {
continue;
}
current = current.get(segment)?;
}
Some(current.clone())
}
fn sqs_source_event(
msg: &fakecloud_sqs::SqsMessage,
body: &str,
source_arn: &str,
region: &str,
) -> Value {
let attributes: serde_json::Map<String, Value> = msg
.attributes
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone())))
.collect();
json!({
"messageId": msg.message_id,
"receiptHandle": msg.receipt_handle.clone().unwrap_or_default(),
"body": body,
"attributes": attributes,
"messageAttributes": {},
"md5OfBody": msg.md5_of_body,
"eventSource": "aws:sqs",
"eventSourceArn": source_arn,
"awsRegion": region,
})
}
fn kinesis_source_event(
record: &fakecloud_kinesis::KinesisRecord,
shard_id: &str,
source_arn: &str,
region: &str,
) -> Value {
json!({
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": format!("{}:{}", shard_id, record.sequence_number),
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/pipes-role",
"awsRegion": region,
"eventSourceARN": source_arn,
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": record.partition_key,
"sequenceNumber": record.sequence_number,
"data": base64::engine::general_purpose::STANDARD.encode(&record.data),
"approximateArrivalTimestamp":
record.approximate_arrival_timestamp.timestamp_millis() as f64 / 1000.0,
}
})
}
fn ddb_source_event(record: &fakecloud_dynamodb::StreamRecord) -> Value {
let mut event = json!({
"eventID": record.event_id,
"eventName": record.event_name,
"eventVersion": record.event_version,
"eventSource": record.event_source,
"awsRegion": record.aws_region,
"dynamodb": {
"Keys": record.dynamodb.keys,
"SequenceNumber": record.dynamodb.sequence_number,
"SizeBytes": record.dynamodb.size_bytes,
"StreamViewType": record.dynamodb.stream_view_type,
},
"eventSourceARN": record.event_source_arn,
});
if let Some(ref new_img) = record.dynamodb.new_image {
event["dynamodb"]["NewImage"] = json!(new_img);
}
if let Some(ref old_img) = record.dynamodb.old_image {
event["dynamodb"]["OldImage"] = json!(old_img);
}
event
}
fn looks_like_fakecloud_envelope(body: &str) -> bool {
if body.starts_with("fakecloud-kms:") {
return true;
}
match base64::engine::general_purpose::STANDARD.decode(body) {
Ok(bytes) => bytes.starts_with(&[0x01, 0x02, 0x02, 0x00]),
Err(_) => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn input_template_renders_json_object() {
let event = json!({"body": "hello", "messageId": "m-1"});
let tmpl = r#"{"text": "<$.body>", "id": "<$.messageId>"}"#;
let out = apply_input_template(Some(tmpl), &event);
assert_eq!(out, json!({"text": "hello", "id": "m-1"}));
}
#[test]
fn input_template_whole_event_token() {
let event = json!({"a": 1});
let out = apply_input_template(Some("<aws.pipes.event.json>"), &event);
assert_eq!(out, event);
}
#[test]
fn input_template_non_string_value_is_injected_raw() {
let event = json!({"count": 5, "nested": {"k": "v"}});
let tmpl = r#"{"n": <$.count>, "obj": <$.nested>}"#;
let out = apply_input_template(Some(tmpl), &event);
assert_eq!(out, json!({"n": 5, "obj": {"k": "v"}}));
}
#[test]
fn input_template_freeform_text_stays_string() {
let event = json!({"name": "ada"});
let out = apply_input_template(Some("hello <$.name>"), &event);
assert_eq!(out, Value::String("hello ada".into()));
}
#[test]
fn input_template_missing_path_substitutes_empty() {
let event = json!({"a": 1});
let out = apply_input_template(Some("x<$.missing>y"), &event);
assert_eq!(out, Value::String("xy".into()));
}
#[test]
fn no_template_passes_event_through() {
let event = json!({"a": 1});
assert_eq!(apply_input_template(None, &event), event);
}
#[test]
fn enrichment_output_array_passthrough() {
let bytes = br#"[{"a":1},{"a":2}]"#;
assert_eq!(enrichment_output(bytes).len(), 2);
}
#[test]
fn enrichment_output_single_object_wraps() {
let bytes = br#"{"a":1}"#;
assert_eq!(enrichment_output(bytes), vec![json!({"a": 1})]);
}
#[test]
fn enrichment_output_null_drops() {
assert!(enrichment_output(b"null").is_empty());
assert!(enrichment_output(b"\"\"").is_empty());
assert!(enrichment_output(b"not json").is_empty());
}
#[test]
fn kinesis_start_index_respects_starting_position() {
let records: Vec<fakecloud_kinesis::KinesisRecord> = Vec::new();
assert_eq!(kinesis_start_index(&records, Some("LATEST"), None), 0);
assert_eq!(kinesis_start_index(&records, Some("TRIM_HORIZON"), None), 0);
assert_eq!(kinesis_start_index(&records, None, None), 0);
}
#[test]
fn batch_size_clamps_per_source() {
let params = json!({"SqsQueueParameters": {"BatchSize": 50}});
assert_eq!(source_batch_size(Some(¶ms), &SourceKind::Sqs), 10);
let params = json!({"KinesisStreamParameters": {"BatchSize": 500}});
assert_eq!(source_batch_size(Some(¶ms), &SourceKind::Kinesis), 500);
}
fn rec(seq: &str) -> fakecloud_kinesis::KinesisRecord {
fakecloud_kinesis::KinesisRecord {
sequence_number: seq.to_string(),
partition_key: "pk".into(),
data: Vec::new(),
approximate_arrival_timestamp: chrono::DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
}
}
#[test]
fn kinesis_window_start_resolves_by_sequence_number() {
let records = vec![rec("00001"), rec("00002"), rec("00003")];
assert_eq!(kinesis_window_start(&records, ""), 0);
assert_eq!(kinesis_window_start(&records, "00002"), 2);
assert_eq!(kinesis_window_start(&records, "00003"), 3);
}
#[test]
fn kinesis_window_start_survives_retention_trim() {
let before = vec![rec("00001"), rec("00002"), rec("00003"), rec("00004")];
assert_eq!(kinesis_window_start(&before, "00002"), 2);
let after = vec![rec("00003"), rec("00004")];
assert_eq!(kinesis_window_start(&after, "00002"), 0);
assert_eq!(
after[kinesis_window_start(&after, "00002")].sequence_number,
"00003"
);
}
}