1mod operations;
8pub mod state;
9
10pub use state::{Pipe, PipesState};
11
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use awsim_core::{
16 AccountRegionStore, AwsError, Protocol, RequestContext, RouteDefinition, ServiceHandler,
17};
18use serde_json::Value;
19use tracing::debug;
20
21pub struct PipesService {
22 store: AccountRegionStore<PipesState>,
23}
24
25impl PipesService {
26 pub fn new() -> Self {
27 Self {
28 store: AccountRegionStore::new(),
29 }
30 }
31
32 pub fn store(&self) -> AccountRegionStore<PipesState> {
33 self.store.clone()
34 }
35
36 fn get_state(&self, ctx: &RequestContext) -> Arc<PipesState> {
37 self.store.get(&ctx.account_id, &ctx.region)
38 }
39}
40
41impl Default for PipesService {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47#[async_trait]
48impl ServiceHandler for PipesService {
49 fn service_name(&self) -> &str {
50 "pipes"
51 }
52
53 fn signing_name(&self) -> &str {
54 "pipes"
55 }
56
57 fn protocol(&self) -> Protocol {
58 Protocol::RestJson1
59 }
60
61 fn routes(&self) -> Vec<RouteDefinition> {
62 vec![
63 RouteDefinition {
64 method: "POST",
65 path_pattern: "/v1/pipes/{Name}",
66 operation: "CreatePipe",
67 required_query_param: None,
68 },
69 RouteDefinition {
70 method: "GET",
71 path_pattern: "/v1/pipes/{Name}",
72 operation: "DescribePipe",
73 required_query_param: None,
74 },
75 RouteDefinition {
76 method: "GET",
77 path_pattern: "/v1/pipes",
78 operation: "ListPipes",
79 required_query_param: None,
80 },
81 RouteDefinition {
82 method: "DELETE",
83 path_pattern: "/v1/pipes/{Name}",
84 operation: "DeletePipe",
85 required_query_param: None,
86 },
87 RouteDefinition {
88 method: "PUT",
89 path_pattern: "/v1/pipes/{Name}",
90 operation: "UpdatePipe",
91 required_query_param: None,
92 },
93 RouteDefinition {
94 method: "POST",
95 path_pattern: "/v1/pipes/{Name}/start",
96 operation: "StartPipe",
97 required_query_param: None,
98 },
99 RouteDefinition {
100 method: "POST",
101 path_pattern: "/v1/pipes/{Name}/stop",
102 operation: "StopPipe",
103 required_query_param: None,
104 },
105 RouteDefinition {
106 method: "GET",
107 path_pattern: "/tags/{ResourceArn}",
108 operation: "ListTagsForResource",
109 required_query_param: None,
110 },
111 RouteDefinition {
112 method: "POST",
113 path_pattern: "/tags/{ResourceArn}",
114 operation: "TagResource",
115 required_query_param: None,
116 },
117 RouteDefinition {
118 method: "DELETE",
119 path_pattern: "/tags/{ResourceArn}",
120 operation: "UntagResource",
121 required_query_param: None,
122 },
123 ]
124 }
125
126 async fn handle(
127 &self,
128 operation: &str,
129 input: Value,
130 ctx: &RequestContext,
131 ) -> Result<Value, AwsError> {
132 debug!(operation, "Pipes request");
133 let state = self.get_state(ctx);
134 match operation {
135 "CreatePipe" => operations::pipes::create_pipe(&state, &input, ctx),
136 "DescribePipe" => operations::pipes::describe_pipe(&state, &input, ctx),
137 "ListPipes" => operations::pipes::list_pipes(&state, &input, ctx),
138 "DeletePipe" => operations::pipes::delete_pipe(&state, &input, ctx),
139 "UpdatePipe" => operations::pipes::update_pipe(&state, &input, ctx),
140 "StartPipe" => operations::pipes::start_pipe(&state, &input, ctx),
141 "StopPipe" => operations::pipes::stop_pipe(&state, &input, ctx),
142 "ListTagsForResource" => operations::pipes::list_tags_for_resource(&state, &input, ctx),
143 "TagResource" => operations::tags::tag_resource(&state, &input, ctx),
144 "UntagResource" => operations::tags::untag_resource(&state, &input, ctx),
145 _ => Err(AwsError::unknown_operation(operation)),
146 }
147 }
148
149 fn snapshot(&self) -> Option<Vec<u8>> {
150 let mut all = state::PipesStateSnapshot { pipes: vec![] };
151 for (_, st) in self.store.iter_all() {
152 all.pipes.extend(st.to_snapshot().pipes);
153 }
154 serde_json::to_vec(&all).ok()
155 }
156
157 fn restore(&self, data: &[u8]) -> Result<(), String> {
158 let snap: state::PipesStateSnapshot =
159 serde_json::from_slice(data).map_err(|e| e.to_string())?;
160 let st = self.store.get("000000000000", "us-east-1");
161 st.restore_from_snapshot(snap);
162 Ok(())
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use serde_json::json;
170
171 fn ctx() -> RequestContext {
172 RequestContext::new("pipes", "us-east-1")
173 }
174
175 fn block_on<F: std::future::Future>(f: F) -> F::Output {
176 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
177 fn noop_clone(_: *const ()) -> RawWaker {
178 noop_raw_waker()
179 }
180 fn noop(_: *const ()) {}
181 fn noop_raw_waker() -> RawWaker {
182 static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
183 RawWaker::new(std::ptr::null(), &VTABLE)
184 }
185 let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
186 let mut cx = Context::from_waker(&waker);
187 let mut fut = std::pin::pin!(f);
188 loop {
189 if let Poll::Ready(v) = fut.as_mut().poll(&mut cx) {
190 return v;
191 }
192 }
193 }
194
195 #[test]
196 fn create_describe_list_delete_lifecycle() {
197 let svc = PipesService::new();
198 let ctx = ctx();
199
200 let created = block_on(svc.handle(
201 "CreatePipe",
202 json!({
203 "Name": "orders-pipe",
204 "Source": "arn:aws:sqs:us-east-1:000000000000:orders",
205 "Target": "arn:aws:lambda:us-east-1:000000000000:function:processor",
206 "RoleArn": "arn:aws:iam::000000000000:role/PipesRole",
207 "DesiredState": "RUNNING",
208 "Description": "test pipe",
209 }),
210 &ctx,
211 ))
212 .unwrap();
213 assert_eq!(created["CurrentState"], "RUNNING");
214 assert!(
215 created["Arn"]
216 .as_str()
217 .unwrap()
218 .ends_with(":pipe/orders-pipe")
219 );
220
221 let list = block_on(svc.handle("ListPipes", json!({}), &ctx)).unwrap();
222 assert_eq!(list["Pipes"].as_array().unwrap().len(), 1);
223
224 let stopped =
225 block_on(svc.handle("StopPipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
226 assert_eq!(stopped["CurrentState"], "STOPPED");
227
228 let described =
229 block_on(svc.handle("DescribePipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
230 assert_eq!(described["Description"], "test pipe");
231
232 let deleted =
233 block_on(svc.handle("DeletePipe", json!({ "Name": "orders-pipe" }), &ctx)).unwrap();
234 assert_eq!(deleted["CurrentState"], "DELETING");
235
236 let after = block_on(svc.handle("ListPipes", json!({}), &ctx)).unwrap();
237 assert!(after["Pipes"].as_array().unwrap().is_empty());
238 }
239
240 #[test]
241 fn create_duplicate_pipe_conflicts() {
242 let svc = PipesService::new();
243 let ctx = ctx();
244 let body = json!({
245 "Name": "p",
246 "Source": "arn:aws:sqs:us-east-1:000000000000:q",
247 "Target": "arn:aws:lambda:us-east-1:000000000000:function:f",
248 "RoleArn": "arn:aws:iam::000000000000:role/r",
249 });
250 block_on(svc.handle("CreatePipe", body.clone(), &ctx)).unwrap();
251 let err = block_on(svc.handle("CreatePipe", body, &ctx)).unwrap_err();
252 assert_eq!(err.code, "ConflictException");
253 }
254}