1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_router::{IntoPattern, Router};
6use actix_service::{boxed, fn_factory_with_config, IntoServiceFactory, Service, ServiceFactory};
7use amqp_codec::protocol::{DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role};
8use futures::future::{err, ok, Either, Ready};
9use futures::{Stream, StreamExt};
10
11use crate::cell::Cell;
12use crate::rcvlink::ReceiverLink;
13
14use super::errors::LinkError;
15use super::link::Link;
16use super::message::{Message, Outcome};
17use super::State;
18
19type Handle<S> = boxed::BoxServiceFactory<Link<S>, Message<S>, Outcome, Error, Error>;
20
21pub struct App<S = ()>(Vec<(Vec<String>, Handle<S>)>);
22
23impl<S: 'static> App<S> {
24 pub fn new() -> App<S> {
25 App(Vec::new())
26 }
27
28 pub fn service<T, F, U: 'static>(mut self, address: T, service: F) -> Self
29 where
30 T: IntoPattern,
31 F: IntoServiceFactory<U>,
32 U: ServiceFactory<Config = Link<S>, Request = Message<S>, Response = Outcome>,
33 U::Error: Into<Error>,
34 U::InitError: Into<Error>,
35 {
36 self.0.push((
37 address.patterns(),
38 boxed::factory(
39 service
40 .into_factory()
41 .map_init_err(|e| e.into())
42 .map_err(|e| e.into()),
43 ),
44 ));
45
46 self
47 }
48
49 pub fn finish(
50 self,
51 ) -> impl ServiceFactory<
52 Config = State<S>,
53 Request = Link<S>,
54 Response = (),
55 Error = Error,
56 InitError = Error,
57 > {
58 let mut router = Router::build();
59 for (addr, hnd) in self.0 {
60 router.path(addr, hnd);
61 }
62 let router = Cell::new(router.finish());
63
64 fn_factory_with_config(move |_: State<S>| {
65 ok(AppService {
66 router: router.clone(),
67 })
68 })
69 }
70}
71
72struct AppService<S> {
73 router: Cell<Router<Handle<S>>>,
74}
75
76impl<S: 'static> Service for AppService<S> {
77 type Request = Link<S>;
78 type Response = ();
79 type Error = Error;
80 type Future = Either<Ready<Result<(), Error>>, AppServiceResponse<S>>;
81
82 fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
83 Poll::Ready(Ok(()))
84 }
85
86 fn call(&mut self, mut link: Link<S>) -> Self::Future {
87 let path = link
88 .frame()
89 .target
90 .as_ref()
91 .and_then(|target| target.address.as_ref().map(|addr| addr.clone()));
92
93 if let Some(path) = path {
94 link.path_mut().set(path);
95 if let Some((hnd, _info)) = self.router.recognize(link.path_mut()) {
96 let fut = hnd.new_service(link.clone());
97 Either::Right(AppServiceResponse {
98 link: link.link.clone(),
99 app_state: link.state.clone(),
100 state: AppServiceResponseState::NewService(fut),
101 })
103 } else {
104 Either::Left(err(LinkError::force_detach()
105 .description(format!(
106 "Target address is not supported: {}",
107 link.path().get_ref()
108 ))
109 .into()))
110 }
111 } else {
112 Either::Left(err(LinkError::force_detach()
113 .description("Target address is required")
114 .into()))
115 }
116 }
117}
118
119struct AppServiceResponse<S> {
120 link: ReceiverLink,
121 app_state: State<S>,
122 state: AppServiceResponseState<S>,
123 }
125
126enum AppServiceResponseState<S> {
127 Service(boxed::BoxService<Message<S>, Outcome, Error>),
128 NewService(
129 Pin<Box<dyn Future<Output = Result<boxed::BoxService<Message<S>, Outcome, Error>, Error>>>>,
130 ),
131}
132
133impl<S> Future for AppServiceResponse<S> {
134 type Output = Result<(), Error>;
135
136 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137 let mut this = self.as_mut();
138 let mut link = this.link.clone();
139 let app_state = this.app_state.clone();
140
141 loop {
142 match this.state {
143 AppServiceResponseState::Service(ref mut srv) => {
144 match srv.poll_ready(cx) {
146 Poll::Ready(Ok(_)) => (),
147 Poll::Pending => return Poll::Pending,
148 Poll::Ready(Err(e)) => {
149 let _ = this.link.close_with_error(
150 LinkError::force_detach()
151 .description(format!("error: {}", e))
152 .into(),
153 );
154 return Poll::Ready(Ok(()));
155 }
156 }
157
158 match Pin::new(&mut link).poll_next(cx) {
159 Poll::Ready(Some(Ok(transfer))) => {
160 if transfer.delivery_id.is_none() {
162 let _ = this.link.close_with_error(
163 LinkError::force_detach()
164 .description("delivery_id MUST be set")
165 .into(),
166 );
167 return Poll::Ready(Ok(()));
168 }
169 if link.credit() == 0 {
170 link.set_link_credit(50);
172 }
173
174 let delivery_id = transfer.delivery_id.unwrap();
175 let msg = Message::new(app_state.clone(), transfer, link.clone());
176
177 let mut fut = srv.call(msg);
178 match Pin::new(&mut fut).poll(cx) {
179 Poll::Ready(Ok(outcome)) => settle(
180 &mut this.link,
181 delivery_id,
182 outcome.into_delivery_state(),
183 ),
184 Poll::Pending => {
185 actix_rt::spawn(HandleMessage {
186 fut,
187 delivery_id,
188 link: this.link.clone(),
189 });
190 }
191 Poll::Ready(Err(e)) => settle(
192 &mut this.link,
193 delivery_id,
194 DeliveryState::Rejected(Rejected { error: Some(e) }),
195 ),
196 }
197 }
198 Poll::Ready(None) => return Poll::Ready(Ok(())),
199 Poll::Pending => return Poll::Pending,
200 Poll::Ready(Some(Err(_))) => {
201 let _ = this.link.close_with_error(LinkError::force_detach().into());
202 return Poll::Ready(Ok(()));
203 }
204 }
205 }
206 AppServiceResponseState::NewService(ref mut fut) => match Pin::new(fut).poll(cx) {
207 Poll::Ready(Ok(srv)) => {
208 this.link.open();
209 this.link.set_link_credit(50);
210 this.state = AppServiceResponseState::Service(srv);
211 continue;
212 }
213 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
214 Poll::Pending => return Poll::Pending,
215 },
216 }
217 }
218 }
219}
220
221struct HandleMessage {
222 link: ReceiverLink,
223 delivery_id: DeliveryNumber,
224 fut: Pin<Box<dyn Future<Output = Result<Outcome, Error>>>>,
225}
226
227impl Future for HandleMessage {
228 type Output = ();
229
230 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
231 let mut this = self.as_mut();
232
233 match Pin::new(&mut this.fut).poll(cx) {
234 Poll::Ready(Ok(outcome)) => {
235 let delivery_id = this.delivery_id;
236 settle(&mut this.link, delivery_id, outcome.into_delivery_state());
237 Poll::Ready(())
238 }
239 Poll::Pending => Poll::Pending,
240 Poll::Ready(Err(e)) => {
241 let delivery_id = this.delivery_id;
242 settle(
243 &mut this.link,
244 delivery_id,
245 DeliveryState::Rejected(Rejected { error: Some(e) }),
246 );
247 Poll::Ready(())
248 }
249 }
250 }
251}
252
253fn settle(link: &mut ReceiverLink, id: DeliveryNumber, state: DeliveryState) {
254 let disposition = Disposition {
255 state: Some(state),
256 role: Role::Receiver,
257 first: id,
258 last: None,
259 settled: true,
260 batchable: false,
261 };
262 link.send_disposition(disposition);
263}