1use crate::{
20 types::{
21 AsDisplayText, Command, CommandResponse, CommandResponseType, CommandResult,
22 CommandResultType, CommandType, Language,
23 },
24 Context, Cpo, Error, MpscCommandsHandler, Party, Result, Store,
25};
26use async_trait::async_trait;
27use tokio::{sync::oneshot, time};
28
29type Sender<T> = oneshot::Sender<T>;
30type Receiver<T> = oneshot::Receiver<T>;
31
32#[async_trait]
33pub trait CommandsModule {
34 async fn commands_post(
35 &self,
36 ctx: Context,
37 command: CommandType,
38 body: serde_json::Value,
39 ) -> Result<CommandResponse> {
40 let command = match command {
41 CommandType::CancelReservation => {
42 serde_json::from_value(body).map(Command::CancelReservation)?
43 }
44
45 CommandType::ReserveNow => serde_json::from_value(body).map(Command::ReserveNow)?,
46
47 CommandType::StartSession => serde_json::from_value(body).map(Command::StartSession)?,
48
49 CommandType::StopSession => serde_json::from_value(body).map(Command::StopSession)?,
50
51 CommandType::UnlockConnector => {
52 serde_json::from_value(body).map(Command::UnlockConnector)?
53 }
54 };
55
56 self.handle_command(ctx, command).await
57 }
58
59 async fn handle_command(&self, ctx: Context, command: Command) -> Result<CommandResponse>;
60}
61
62#[async_trait]
63impl<DB> CommandsModule for Cpo<DB, MpscCommandsHandler<DB::PartyModel>>
64where
65 DB: Store,
66{
67 async fn handle_command(&self, context: Context, command: Command) -> Result<CommandResponse> {
68 let party = self
69 .db
70 .get_authorized(context.credentials_token.clone())
71 .await?
72 .party()?;
73
74 let (tx, response_fut) = oneshot::channel();
75
76 let response_url = command.response_url().as_str().parse::<url::Url>()?;
77
78 let promise = ResponsePromise(tx);
79 let request = CommandRequest {
80 context: context.clone(),
81 command,
82 promise,
83 party: party.clone(),
84 };
85
86 self.commands_handler
87 .0
88 .send(request)
89 .await
90 .map_err(|err| Error::internal_server(err.to_string()))?;
91
92 let (result_fut, response) =
94 match time::timeout(time::Duration::from_secs(30), response_fut).await? {
95 Err(_) => {
97 return Ok(CommandResponseType::Rejected
98 .response(0, ["CPO ignored request".must_en()]))
99 }
100
101 Ok((_, Err(err))) => return Err(err),
103
104 Ok((_, Ok(response))) if response.result != CommandResponseType::Accepted => {
107 return Ok(response);
108 }
109
110 Ok((result_fut, Ok(response))) => (result_fut, response),
112 };
113
114 let client = self.client.clone();
115 let timeout = time::Duration::from_secs(response.timeout);
116
117 tokio::spawn(async move {
120 let result = time::timeout(timeout, result_fut)
121 .await
122 .unwrap_or_else(|_| Ok(CommandResultType::Timeout.without_message()))
123 .unwrap_or_else(|_| {
124 CommandResultType::Failed.with_message(
125 "Internal server error handling result".must_in_language(Language::En),
126 )
127 });
128
129 log::debug!(
130 "Posting Result: `{:?}` to `{}`",
131 result.result,
132 response_url
133 );
134
135 if let Err(err) = client
136 .post_response(context.extend(&party.token_we_use()), &response_url, result)
137 .await
138 {
139 log::error!("Failed to send result to `{}`: {}", response_url, err);
140 }
141 });
142
143 Ok(response)
144 }
145}
146
147pub struct CommandRequest<Party> {
148 pub context: Context,
149 pub command: Command,
150 pub party: Party,
151 pub promise: ResponsePromise,
152}
153
154pub struct ResponsePromise(Sender<(Receiver<CommandResult>, Result<CommandResponse>)>);
155
156impl ResponsePromise {
157 pub fn reply(self, res: Result<CommandResponse>) -> ResultPromise {
158 let (result_tx, result_rx) = oneshot::channel();
159
160 self.0
161 .send((result_rx, res))
162 .expect("Error sending response. Other end hung up");
163
164 ResultPromise(result_tx)
165 }
166}
167
168pub struct ResultPromise(Sender<CommandResult>);
169
170impl ResultPromise {
171 pub fn reply(self, res: CommandResult) {
172 self.0.send(res).expect("Sending result. Other end hung up");
173 }
174}