ocpi 0.3.5

Unofficial, in progress, OCPI implementation
Documentation
//! 13. Commands module
//!
//! Module Identifier: commands
//! Type: Functional Module
//!
//! The Commands module enables remote commands to be sent to a Location/EVSE.
//! The following commands are supported:
//! • RESERVE_NOW
//! • CANCEL_RESERVATION
//! • START_SESSION
//! • STOP_SESSION
//! • UNLOCK_CONNECTOR
//!
//! See CommandType for a description of the different commands.
//! Use the UNLOCK_CONNECTOR command with care, please read the note at CommandType.
//!
//! Module dependency: Locations module, Sessions module

use crate::{
    types::{
        AsDisplayText, Command, CommandResponse, CommandResponseType, CommandResult,
        CommandResultType, CommandType, Language,
    },
    Context, Cpo, Error, MpscCommandsHandler, Party, Result, Store,
};
use async_trait::async_trait;
use tokio::{sync::oneshot, time};

type Sender<T> = oneshot::Sender<T>;
type Receiver<T> = oneshot::Receiver<T>;

#[async_trait]
pub trait CommandsModule {
    async fn commands_post(
        &self,
        ctx: Context,
        command: CommandType,
        body: serde_json::Value,
    ) -> Result<CommandResponse> {
        let command = match command {
            CommandType::CancelReservation => {
                serde_json::from_value(body).map(Command::CancelReservation)?
            }

            CommandType::ReserveNow => serde_json::from_value(body).map(Command::ReserveNow)?,

            CommandType::StartSession => serde_json::from_value(body).map(Command::StartSession)?,

            CommandType::StopSession => serde_json::from_value(body).map(Command::StopSession)?,

            CommandType::UnlockConnector => {
                serde_json::from_value(body).map(Command::UnlockConnector)?
            }
        };

        self.handle_command(ctx, command).await
    }

    async fn handle_command(&self, ctx: Context, command: Command) -> Result<CommandResponse>;
}

#[async_trait]
impl<DB> CommandsModule for Cpo<DB, MpscCommandsHandler<DB::PartyModel>>
where
    DB: Store,
{
    async fn handle_command(&self, context: Context, command: Command) -> Result<CommandResponse> {
        let party = self
            .db
            .get_authorized(context.credentials_token.clone())
            .await?
            .party()?;

        let (tx, response_fut) = oneshot::channel();

        let response_url = command.response_url().as_str().parse::<url::Url>()?;

        let promise = ResponsePromise(tx);
        let request = CommandRequest {
            context: context.clone(),
            command,
            promise,
            party: party.clone(),
        };

        self.commands_handler
            .0
            .send(request)
            .await
            .map_err(|err| Error::internal_server(err.to_string()))?;

        // the `?` handles the timeout using Errors `from` impl
        let (result_fut, response) =
            match time::timeout(time::Duration::from_secs(30), response_fut).await? {
                // Oneshot channel closed without reply sent.
                Err(_) => {
                    return Ok(CommandResponseType::Rejected
                        .response(0, ["CPO ignored request".must_en()]))
                }

                // Handling the request caused an Error.
                Ok((_, Err(err))) => return Err(err),

                // Response was successful, but not accepted.
                // Because of this there wont be any result. So we can just return here.
                Ok((_, Ok(response))) if response.result != CommandResponseType::Accepted => {
                    return Ok(response);
                }

                // Response was successful, and accepted.
                Ok((result_fut, Ok(response))) => (result_fut, response),
            };

        let client = self.client.clone();
        let timeout = time::Duration::from_secs(response.timeout);

        // Spawn a thing waiting for the reply.
        // This will use the same timeout as replied with.
        tokio::spawn(async move {
            let result = time::timeout(timeout, result_fut)
                .await
                .unwrap_or_else(|_| Ok(CommandResultType::Timeout.without_message()))
                .unwrap_or_else(|_| {
                    CommandResultType::Failed.with_message(
                        "Internal server error handling result".must_in_language(Language::En),
                    )
                });

            log::debug!(
                "Posting Result: `{:?}` to `{}`",
                result.result,
                response_url
            );

            if let Err(err) = client
                .post_response(context.extend(&party.token_we_use()), &response_url, result)
                .await
            {
                log::error!("Failed to send result to `{}`: {}", response_url, err);
            }
        });

        Ok(response)
    }
}

pub struct CommandRequest<Party> {
    pub context: Context,
    pub command: Command,
    pub party: Party,
    pub promise: ResponsePromise,
}

pub struct ResponsePromise(Sender<(Receiver<CommandResult>, Result<CommandResponse>)>);

impl ResponsePromise {
    pub fn reply(self, res: Result<CommandResponse>) -> ResultPromise {
        let (result_tx, result_rx) = oneshot::channel();

        self.0
            .send((result_rx, res))
            .expect("Error sending response. Other end hung up");

        ResultPromise(result_tx)
    }
}

pub struct ResultPromise(Sender<CommandResult>);

impl ResultPromise {
    pub fn reply(self, res: CommandResult) {
        self.0.send(res).expect("Sending result. Other end hung up");
    }
}