use crate::{
types::{
AsDisplayText, Command, CommandResponse, CommandResponseType, CommandResult,
CommandResultType, CommandType, Language,
},
Context, Cpo, Error, MpscCommandsHandler, Party, Result, Store,
};
use async_trait::async_trait;
use tokio::{sync::oneshot, time};
type Sender<T> = oneshot::Sender<T>;
type Receiver<T> = oneshot::Receiver<T>;
#[async_trait]
pub trait CommandsModule {
async fn commands_post(
&self,
ctx: Context,
command: CommandType,
body: serde_json::Value,
) -> Result<CommandResponse> {
let command = match command {
CommandType::CancelReservation => {
serde_json::from_value(body).map(Command::CancelReservation)?
}
CommandType::ReserveNow => serde_json::from_value(body).map(Command::ReserveNow)?,
CommandType::StartSession => serde_json::from_value(body).map(Command::StartSession)?,
CommandType::StopSession => serde_json::from_value(body).map(Command::StopSession)?,
CommandType::UnlockConnector => {
serde_json::from_value(body).map(Command::UnlockConnector)?
}
};
self.handle_command(ctx, command).await
}
async fn handle_command(&self, ctx: Context, command: Command) -> Result<CommandResponse>;
}
#[async_trait]
impl<DB> CommandsModule for Cpo<DB, MpscCommandsHandler<DB::PartyModel>>
where
DB: Store,
{
async fn handle_command(&self, context: Context, command: Command) -> Result<CommandResponse> {
let party = self
.db
.get_authorized(context.credentials_token.clone())
.await?
.party()?;
let (tx, response_fut) = oneshot::channel();
let response_url = command.response_url().as_str().parse::<url::Url>()?;
let promise = ResponsePromise(tx);
let request = CommandRequest {
context: context.clone(),
command,
promise,
party: party.clone(),
};
self.commands_handler
.0
.send(request)
.await
.map_err(|err| Error::internal_server(err.to_string()))?;
let (result_fut, response) =
match time::timeout(time::Duration::from_secs(30), response_fut).await? {
Err(_) => {
return Ok(CommandResponseType::Rejected
.response(0, ["CPO ignored request".must_en()]))
}
Ok((_, Err(err))) => return Err(err),
Ok((_, Ok(response))) if response.result != CommandResponseType::Accepted => {
return Ok(response);
}
Ok((result_fut, Ok(response))) => (result_fut, response),
};
let client = self.client.clone();
let timeout = time::Duration::from_secs(response.timeout);
tokio::spawn(async move {
let result = time::timeout(timeout, result_fut)
.await
.unwrap_or_else(|_| Ok(CommandResultType::Timeout.without_message()))
.unwrap_or_else(|_| {
CommandResultType::Failed.with_message(
"Internal server error handling result".must_in_language(Language::En),
)
});
log::debug!(
"Posting Result: `{:?}` to `{}`",
result.result,
response_url
);
if let Err(err) = client
.post_response(context.extend(&party.token_we_use()), &response_url, result)
.await
{
log::error!("Failed to send result to `{}`: {}", response_url, err);
}
});
Ok(response)
}
}
pub struct CommandRequest<Party> {
pub context: Context,
pub command: Command,
pub party: Party,
pub promise: ResponsePromise,
}
pub struct ResponsePromise(Sender<(Receiver<CommandResult>, Result<CommandResponse>)>);
impl ResponsePromise {
pub fn reply(self, res: Result<CommandResponse>) -> ResultPromise {
let (result_tx, result_rx) = oneshot::channel();
self.0
.send((result_rx, res))
.expect("Error sending response. Other end hung up");
ResultPromise(result_tx)
}
}
pub struct ResultPromise(Sender<CommandResult>);
impl ResultPromise {
pub fn reply(self, res: CommandResult) {
self.0.send(res).expect("Sending result. Other end hung up");
}
}