actix_amqp/server/
app.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use actix_router::{IntoPattern, Router};
6use actix_service::{boxed, fn_factory_with_config, IntoServiceFactory, Service, ServiceFactory};
7use amqp_codec::protocol::{DeliveryNumber, DeliveryState, Disposition, Error, Rejected, Role};
8use futures::future::{err, ok, Either, Ready};
9use futures::{Stream, StreamExt};
10
11use crate::cell::Cell;
12use crate::rcvlink::ReceiverLink;
13
14use super::errors::LinkError;
15use super::link::Link;
16use super::message::{Message, Outcome};
17use super::State;
18
19type Handle<S> = boxed::BoxServiceFactory<Link<S>, Message<S>, Outcome, Error, Error>;
20
21pub struct App<S = ()>(Vec<(Vec<String>, Handle<S>)>);
22
23impl<S: 'static> App<S> {
24    pub fn new() -> App<S> {
25        App(Vec::new())
26    }
27
28    pub fn service<T, F, U: 'static>(mut self, address: T, service: F) -> Self
29    where
30        T: IntoPattern,
31        F: IntoServiceFactory<U>,
32        U: ServiceFactory<Config = Link<S>, Request = Message<S>, Response = Outcome>,
33        U::Error: Into<Error>,
34        U::InitError: Into<Error>,
35    {
36        self.0.push((
37            address.patterns(),
38            boxed::factory(
39                service
40                    .into_factory()
41                    .map_init_err(|e| e.into())
42                    .map_err(|e| e.into()),
43            ),
44        ));
45
46        self
47    }
48
49    pub fn finish(
50        self,
51    ) -> impl ServiceFactory<
52        Config = State<S>,
53        Request = Link<S>,
54        Response = (),
55        Error = Error,
56        InitError = Error,
57    > {
58        let mut router = Router::build();
59        for (addr, hnd) in self.0 {
60            router.path(addr, hnd);
61        }
62        let router = Cell::new(router.finish());
63
64        fn_factory_with_config(move |_: State<S>| {
65            ok(AppService {
66                router: router.clone(),
67            })
68        })
69    }
70}
71
72struct AppService<S> {
73    router: Cell<Router<Handle<S>>>,
74}
75
76impl<S: 'static> Service for AppService<S> {
77    type Request = Link<S>;
78    type Response = ();
79    type Error = Error;
80    type Future = Either<Ready<Result<(), Error>>, AppServiceResponse<S>>;
81
82    fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
83        Poll::Ready(Ok(()))
84    }
85
86    fn call(&mut self, mut link: Link<S>) -> Self::Future {
87        let path = link
88            .frame()
89            .target
90            .as_ref()
91            .and_then(|target| target.address.as_ref().map(|addr| addr.clone()));
92
93        if let Some(path) = path {
94            link.path_mut().set(path);
95            if let Some((hnd, _info)) = self.router.recognize(link.path_mut()) {
96                let fut = hnd.new_service(link.clone());
97                Either::Right(AppServiceResponse {
98                    link: link.link.clone(),
99                    app_state: link.state.clone(),
100                    state: AppServiceResponseState::NewService(fut),
101                    // has_credit: true,
102                })
103            } else {
104                Either::Left(err(LinkError::force_detach()
105                    .description(format!(
106                        "Target address is not supported: {}",
107                        link.path().get_ref()
108                    ))
109                    .into()))
110            }
111        } else {
112            Either::Left(err(LinkError::force_detach()
113                .description("Target address is required")
114                .into()))
115        }
116    }
117}
118
119struct AppServiceResponse<S> {
120    link: ReceiverLink,
121    app_state: State<S>,
122    state: AppServiceResponseState<S>,
123    // has_credit: bool,
124}
125
126enum AppServiceResponseState<S> {
127    Service(boxed::BoxService<Message<S>, Outcome, Error>),
128    NewService(
129        Pin<Box<dyn Future<Output = Result<boxed::BoxService<Message<S>, Outcome, Error>, Error>>>>,
130    ),
131}
132
133impl<S> Future for AppServiceResponse<S> {
134    type Output = Result<(), Error>;
135
136    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137        let mut this = self.as_mut();
138        let mut link = this.link.clone();
139        let app_state = this.app_state.clone();
140
141        loop {
142            match this.state {
143                AppServiceResponseState::Service(ref mut srv) => {
144                    // check readiness
145                    match srv.poll_ready(cx) {
146                        Poll::Ready(Ok(_)) => (),
147                        Poll::Pending => return Poll::Pending,
148                        Poll::Ready(Err(e)) => {
149                            let _ = this.link.close_with_error(
150                                LinkError::force_detach()
151                                    .description(format!("error: {}", e))
152                                    .into(),
153                            );
154                            return Poll::Ready(Ok(()));
155                        }
156                    }
157
158                    match Pin::new(&mut link).poll_next(cx) {
159                        Poll::Ready(Some(Ok(transfer))) => {
160                            // #2.7.5 delivery_id MUST be set. batching is not supported atm
161                            if transfer.delivery_id.is_none() {
162                                let _ = this.link.close_with_error(
163                                    LinkError::force_detach()
164                                        .description("delivery_id MUST be set")
165                                        .into(),
166                                );
167                                return Poll::Ready(Ok(()));
168                            }
169                            if link.credit() == 0 {
170                                // self.has_credit = self.link.credit() != 0;
171                                link.set_link_credit(50);
172                            }
173
174                            let delivery_id = transfer.delivery_id.unwrap();
175                            let msg = Message::new(app_state.clone(), transfer, link.clone());
176
177                            let mut fut = srv.call(msg);
178                            match Pin::new(&mut fut).poll(cx) {
179                                Poll::Ready(Ok(outcome)) => settle(
180                                    &mut this.link,
181                                    delivery_id,
182                                    outcome.into_delivery_state(),
183                                ),
184                                Poll::Pending => {
185                                    actix_rt::spawn(HandleMessage {
186                                        fut,
187                                        delivery_id,
188                                        link: this.link.clone(),
189                                    });
190                                }
191                                Poll::Ready(Err(e)) => settle(
192                                    &mut this.link,
193                                    delivery_id,
194                                    DeliveryState::Rejected(Rejected { error: Some(e) }),
195                                ),
196                            }
197                        }
198                        Poll::Ready(None) => return Poll::Ready(Ok(())),
199                        Poll::Pending => return Poll::Pending,
200                        Poll::Ready(Some(Err(_))) => {
201                            let _ = this.link.close_with_error(LinkError::force_detach().into());
202                            return Poll::Ready(Ok(()));
203                        }
204                    }
205                }
206                AppServiceResponseState::NewService(ref mut fut) => match Pin::new(fut).poll(cx) {
207                    Poll::Ready(Ok(srv)) => {
208                        this.link.open();
209                        this.link.set_link_credit(50);
210                        this.state = AppServiceResponseState::Service(srv);
211                        continue;
212                    }
213                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
214                    Poll::Pending => return Poll::Pending,
215                },
216            }
217        }
218    }
219}
220
221struct HandleMessage {
222    link: ReceiverLink,
223    delivery_id: DeliveryNumber,
224    fut: Pin<Box<dyn Future<Output = Result<Outcome, Error>>>>,
225}
226
227impl Future for HandleMessage {
228    type Output = ();
229
230    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
231        let mut this = self.as_mut();
232
233        match Pin::new(&mut this.fut).poll(cx) {
234            Poll::Ready(Ok(outcome)) => {
235                let delivery_id = this.delivery_id;
236                settle(&mut this.link, delivery_id, outcome.into_delivery_state());
237                Poll::Ready(())
238            }
239            Poll::Pending => Poll::Pending,
240            Poll::Ready(Err(e)) => {
241                let delivery_id = this.delivery_id;
242                settle(
243                    &mut this.link,
244                    delivery_id,
245                    DeliveryState::Rejected(Rejected { error: Some(e) }),
246                );
247                Poll::Ready(())
248            }
249        }
250    }
251}
252
253fn settle(link: &mut ReceiverLink, id: DeliveryNumber, state: DeliveryState) {
254    let disposition = Disposition {
255        state: Some(state),
256        role: Role::Receiver,
257        first: id,
258        last: None,
259        settled: true,
260        batchable: false,
261    };
262    link.send_disposition(disposition);
263}