ocpi/
commands_module.rs

1//! 13. Commands module
2//!
3//! Module Identifier: commands
4//! Type: Functional Module
5//!
6//! The Commands module enables remote commands to be sent to a Location/EVSE.
7//! The following commands are supported:
8//! • RESERVE_NOW
9//! • CANCEL_RESERVATION
10//! • START_SESSION
11//! • STOP_SESSION
12//! • UNLOCK_CONNECTOR
13//!
14//! See CommandType for a description of the different commands.
15//! Use the UNLOCK_CONNECTOR command with care, please read the note at CommandType.
16//!
17//! Module dependency: Locations module, Sessions module
18
19use crate::{
20    types::{
21        AsDisplayText, Command, CommandResponse, CommandResponseType, CommandResult,
22        CommandResultType, CommandType, Language,
23    },
24    Context, Cpo, Error, MpscCommandsHandler, Party, Result, Store,
25};
26use async_trait::async_trait;
27use tokio::{sync::oneshot, time};
28
29type Sender<T> = oneshot::Sender<T>;
30type Receiver<T> = oneshot::Receiver<T>;
31
32#[async_trait]
33pub trait CommandsModule {
34    async fn commands_post(
35        &self,
36        ctx: Context,
37        command: CommandType,
38        body: serde_json::Value,
39    ) -> Result<CommandResponse> {
40        let command = match command {
41            CommandType::CancelReservation => {
42                serde_json::from_value(body).map(Command::CancelReservation)?
43            }
44
45            CommandType::ReserveNow => serde_json::from_value(body).map(Command::ReserveNow)?,
46
47            CommandType::StartSession => serde_json::from_value(body).map(Command::StartSession)?,
48
49            CommandType::StopSession => serde_json::from_value(body).map(Command::StopSession)?,
50
51            CommandType::UnlockConnector => {
52                serde_json::from_value(body).map(Command::UnlockConnector)?
53            }
54        };
55
56        self.handle_command(ctx, command).await
57    }
58
59    async fn handle_command(&self, ctx: Context, command: Command) -> Result<CommandResponse>;
60}
61
62#[async_trait]
63impl<DB> CommandsModule for Cpo<DB, MpscCommandsHandler<DB::PartyModel>>
64where
65    DB: Store,
66{
67    async fn handle_command(&self, context: Context, command: Command) -> Result<CommandResponse> {
68        let party = self
69            .db
70            .get_authorized(context.credentials_token.clone())
71            .await?
72            .party()?;
73
74        let (tx, response_fut) = oneshot::channel();
75
76        let response_url = command.response_url().as_str().parse::<url::Url>()?;
77
78        let promise = ResponsePromise(tx);
79        let request = CommandRequest {
80            context: context.clone(),
81            command,
82            promise,
83            party: party.clone(),
84        };
85
86        self.commands_handler
87            .0
88            .send(request)
89            .await
90            .map_err(|err| Error::internal_server(err.to_string()))?;
91
92        // the `?` handles the timeout using Errors `from` impl
93        let (result_fut, response) =
94            match time::timeout(time::Duration::from_secs(30), response_fut).await? {
95                // Oneshot channel closed without reply sent.
96                Err(_) => {
97                    return Ok(CommandResponseType::Rejected
98                        .response(0, ["CPO ignored request".must_en()]))
99                }
100
101                // Handling the request caused an Error.
102                Ok((_, Err(err))) => return Err(err),
103
104                // Response was successful, but not accepted.
105                // Because of this there wont be any result. So we can just return here.
106                Ok((_, Ok(response))) if response.result != CommandResponseType::Accepted => {
107                    return Ok(response);
108                }
109
110                // Response was successful, and accepted.
111                Ok((result_fut, Ok(response))) => (result_fut, response),
112            };
113
114        let client = self.client.clone();
115        let timeout = time::Duration::from_secs(response.timeout);
116
117        // Spawn a thing waiting for the reply.
118        // This will use the same timeout as replied with.
119        tokio::spawn(async move {
120            let result = time::timeout(timeout, result_fut)
121                .await
122                .unwrap_or_else(|_| Ok(CommandResultType::Timeout.without_message()))
123                .unwrap_or_else(|_| {
124                    CommandResultType::Failed.with_message(
125                        "Internal server error handling result".must_in_language(Language::En),
126                    )
127                });
128
129            log::debug!(
130                "Posting Result: `{:?}` to `{}`",
131                result.result,
132                response_url
133            );
134
135            if let Err(err) = client
136                .post_response(context.extend(&party.token_we_use()), &response_url, result)
137                .await
138            {
139                log::error!("Failed to send result to `{}`: {}", response_url, err);
140            }
141        });
142
143        Ok(response)
144    }
145}
146
147pub struct CommandRequest<Party> {
148    pub context: Context,
149    pub command: Command,
150    pub party: Party,
151    pub promise: ResponsePromise,
152}
153
154pub struct ResponsePromise(Sender<(Receiver<CommandResult>, Result<CommandResponse>)>);
155
156impl ResponsePromise {
157    pub fn reply(self, res: Result<CommandResponse>) -> ResultPromise {
158        let (result_tx, result_rx) = oneshot::channel();
159
160        self.0
161            .send((result_rx, res))
162            .expect("Error sending response. Other end hung up");
163
164        ResultPromise(result_tx)
165    }
166}
167
168pub struct ResultPromise(Sender<CommandResult>);
169
170impl ResultPromise {
171    pub fn reply(self, res: CommandResult) {
172        self.0.send(res).expect("Sending result. Other end hung up");
173    }
174}