Skip to main content

awsim_pipes/
lib.rs

1//! EventBridge Pipes emulator.
2//!
3//! Implements the data-plane state for Pipes (Create/Describe/List/Update/
4//! Delete/Start/Stop) with RestJson1 routes. Actual source→target dispatch
5//! is driven by a separate background runner spawned in the awsim binary.
6
7mod operations;
8pub mod state;
9
10pub use state::{Pipe, PipesState};
11
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use awsim_core::{
16    AccountRegionStore, AwsError, Protocol, RequestContext, RouteDefinition, ServiceHandler,
17};
18use serde_json::Value;
19use tracing::debug;
20
21pub struct PipesService {
22    store: AccountRegionStore<PipesState>,
23}
24
25impl PipesService {
26    pub fn new() -> Self {
27        Self {
28            store: AccountRegionStore::new(),
29        }
30    }
31
32    pub fn store(&self) -> AccountRegionStore<PipesState> {
33        self.store.clone()
34    }
35
36    fn get_state(&self, ctx: &RequestContext) -> Arc<PipesState> {
37        self.store.get(&ctx.account_id, &ctx.region)
38    }
39}
40
41impl Default for PipesService {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47#[async_trait]
48impl ServiceHandler for PipesService {
49    fn service_name(&self) -> &str {
50        "pipes"
51    }
52
53    fn signing_name(&self) -> &str {
54        "pipes"
55    }
56
57    fn protocol(&self) -> Protocol {
58        Protocol::RestJson1
59    }
60
61    fn routes(&self) -> Vec<RouteDefinition> {
62        vec![
63            RouteDefinition {
64                method: "POST",
65                path_pattern: "/v1/pipes/{Name}",
66                operation: "CreatePipe",
67                required_query_param: None,
68            },
69            RouteDefinition {
70                method: "GET",
71                path_pattern: "/v1/pipes/{Name}",
72                operation: "DescribePipe",
73                required_query_param: None,
74            },
75            RouteDefinition {
76                method: "GET",
77                path_pattern: "/v1/pipes",
78                operation: "ListPipes",
79                required_query_param: None,
80            },
81            RouteDefinition {
82                method: "DELETE",
83                path_pattern: "/v1/pipes/{Name}",
84                operation: "DeletePipe",
85                required_query_param: None,
86            },
87            RouteDefinition {
88                method: "PUT",
89                path_pattern: "/v1/pipes/{Name}",
90                operation: "UpdatePipe",
91                required_query_param: None,
92            },
93            RouteDefinition {
94                method: "POST",
95                path_pattern: "/v1/pipes/{Name}/start",
96                operation: "StartPipe",
97                required_query_param: None,
98            },
99            RouteDefinition {
100                method: "POST",
101                path_pattern: "/v1/pipes/{Name}/stop",
102                operation: "StopPipe",
103                required_query_param: None,
104            },
105            RouteDefinition {
106                method: "GET",
107                path_pattern: "/tags/{ResourceArn}",
108                operation: "ListTagsForResource",
109                required_query_param: None,
110            },
111            RouteDefinition {
112                method: "POST",
113                path_pattern: "/tags/{ResourceArn}",
114                operation: "TagResource",
115                required_query_param: None,
116            },
117            RouteDefinition {
118                method: "DELETE",
119                path_pattern: "/tags/{ResourceArn}",
120                operation: "UntagResource",
121                required_query_param: None,
122            },
123        ]
124    }
125
126    async fn handle(
127        &self,
128        operation: &str,
129        input: Value,
130        ctx: &RequestContext,
131    ) -> Result<Value, AwsError> {
132        debug!(operation, "Pipes request");
133        let state = self.get_state(ctx);
134        match operation {
135            "CreatePipe" => operations::pipes::create_pipe(&state, &input, ctx),
136            "DescribePipe" => operations::pipes::describe_pipe(&state, &input, ctx),
137            "ListPipes" => operations::pipes::list_pipes(&state, &input, ctx),
138            "DeletePipe" => operations::pipes::delete_pipe(&state, &input, ctx),
139            "UpdatePipe" => operations::pipes::update_pipe(&state, &input, ctx),
140            "StartPipe" => operations::pipes::start_pipe(&state, &input, ctx),
141            "StopPipe" => operations::pipes::stop_pipe(&state, &input, ctx),
142            "ListTagsForResource" => operations::pipes::list_tags_for_resource(&state, &input, ctx),
143            "TagResource" => operations::tags::tag_resource(&state, &input, ctx),
144            "UntagResource" => operations::tags::untag_resource(&state, &input, ctx),
145            _ => Err(AwsError::unknown_operation(operation)),
146        }
147    }
148
149    fn snapshot(&self) -> Option<Vec<u8>> {
150        let mut all = state::PipesStateSnapshot { pipes: vec![] };
151        for (_, st) in self.store.iter_all() {
152            all.pipes.extend(st.to_snapshot().pipes);
153        }
154        serde_json::to_vec(&all).ok()
155    }
156
157    fn restore(&self, data: &[u8]) -> Result<(), String> {
158        let snap: state::PipesStateSnapshot =
159            serde_json::from_slice(data).map_err(|e| e.to_string())?;
160        let st = self.store.get("000000000000", "us-east-1");
161        st.restore_from_snapshot(snap);
162        Ok(())
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use serde_json::json;
170
171    fn ctx() -> RequestContext {
172        RequestContext::new("pipes", "us-east-1")
173    }
174
175    fn block_on<F: std::future::Future>(f: F) -> F::Output {
176        use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
177        fn noop_clone(_: *const ()) -> RawWaker {
178            noop_raw_waker()
179        }
180        fn noop(_: *const ()) {}
181        fn noop_raw_waker() -> RawWaker {
182            static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
183            RawWaker::new(std::ptr::null(), &VTABLE)
184        }
185        let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
186        let mut cx = Context::from_waker(&waker);
187        let mut fut = std::pin::pin!(f);
188        loop {
189            if let Poll::Ready(v) = fut.as_mut().poll(&mut cx) {
190                return v;
191            }
192        }
193    }
194
195    #[test]
196    fn create_describe_list_delete_lifecycle() {
197        let svc = PipesService::new();
198        let ctx = ctx();
199
200        let created = block_on(svc.handle(
201            "CreatePipe",
202            json!({
203                "Name": "orders-pipe",
204                "Source": "arn:aws:sqs:us-east-1:000000000000:orders",
205                "Target": "arn:aws:lambda:us-east-1:000000000000:function:processor",
206                "RoleArn": "arn:aws:iam::000000000000:role/PipesRole",
207                "DesiredState": "RUNNING",
208                "Description": "test pipe",
209            }),
210            &ctx,
211        ))
212        .unwrap();
213        assert_eq!(created["CurrentState"], "RUNNING");
214        assert!(
215            created["Arn"]
216                .as_str()
217                .unwrap()
218                .ends_with(":pipe/orders-pipe")
219        );
220
221        let list = block_on(svc.handle("ListPipes", json!({}), &ctx)).unwrap();
222        assert_eq!(list["Pipes"].as_array().unwrap().len(), 1);
223
224        let stopped =
225            block_on(svc.handle("StopPipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
226        assert_eq!(stopped["CurrentState"], "STOPPED");
227
228        let described =
229            block_on(svc.handle("DescribePipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
230        assert_eq!(described["Description"], "test pipe");
231
232        let deleted =
233            block_on(svc.handle("DeletePipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
234        assert_eq!(deleted["CurrentState"], "DELETING");
235
236        let after = block_on(svc.handle("ListPipes", json!({}), &ctx)).unwrap();
237        assert!(after["Pipes"].as_array().unwrap().is_empty());
238    }
239
240    #[test]
241    fn create_duplicate_pipe_conflicts() {
242        let svc = PipesService::new();
243        let ctx = ctx();
244        let body = json!({
245            "Name": "p",
246            "Source": "arn:aws:sqs:us-east-1:000000000000:q",
247            "Target": "arn:aws:lambda:us-east-1:000000000000:function:f",
248            "RoleArn": "arn:aws:iam::000000000000:role/r",
249        });
250        block_on(svc.handle("CreatePipe", body.clone(), &ctx)).unwrap();
251        let err = block_on(svc.handle("CreatePipe", body, &ctx)).unwrap_err();
252        assert_eq!(err.code, "ConflictException");
253    }
254}