1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
use std::{fmt, time::Duration};

use mpd_protocol::{
    command::{Command as RawCommand, CommandList as RawCommandList},
    response::Response,
    AsyncConnection, MpdProtocolError,
};
use tokio::{
    io::{AsyncRead, AsyncWrite},
    sync::mpsc::{UnboundedReceiver, UnboundedSender},
    time::timeout,
};
use tracing::{debug, error, span, trace, Instrument, Level};

use crate::client::{CommandResponder, ConnectionError, ConnectionEvent, Subsystem};

struct State<C> {
    loop_state: LoopState,
    connection: AsyncConnection<C>,
    commands: UnboundedReceiver<(RawCommandList, CommandResponder)>,
    events: UnboundedSender<ConnectionEvent>,
}

enum LoopState {
    Idling,
    WaitingForCommandReply(CommandResponder),
}

impl fmt::Debug for LoopState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // avoid Debug-printing the noisy internals of the contained channel type
        match self {
            LoopState::Idling => write!(f, "Idling"),
            LoopState::WaitingForCommandReply(_) => write!(f, "WaitingForCommandReply"),
        }
    }
}

fn idle() -> RawCommand {
    RawCommand::new("idle")
}

fn cancel_idle() -> RawCommand {
    RawCommand::new("noidle")
}

pub(super) async fn run_loop<C>(
    mut connection: AsyncConnection<C>,
    commands: UnboundedReceiver<(RawCommandList, CommandResponder)>,
    events: UnboundedSender<ConnectionEvent>,
) where
    C: AsyncRead + AsyncWrite + Unpin,
{
    trace!("sending initial idle command");
    if let Err(e) = connection.send(idle()).await {
        error!(error = ?e, "failed to send initial idle command");
        let _ = events.send(ConnectionEvent::ConnectionClosed(e.into()));
        return;
    }

    let mut state = State {
        loop_state: LoopState::Idling,
        connection,
        commands,
        events,
    };

    trace!("entering run loop");

    loop {
        let span = span!(Level::TRACE, "iteration", state = ?state.loop_state);

        match run_loop_iteration(state).instrument(span).await {
            Ok(new_state) => state = new_state,
            Err(()) => break,
        }
    }

    trace!("exited run_loop");
}

/// Time to wait for another command to send before starting the idle loop.
const NEXT_COMMAND_IDLE_TIMEOUT: Duration = Duration::from_millis(100);

async fn run_loop_iteration<C>(mut state: State<C>) -> Result<State<C>, ()>
where
    C: AsyncRead + AsyncWrite + Unpin,
{
    match state.loop_state {
        LoopState::Idling => {
            // We are idling (the last command sent to the server was an IDLE).

            // Wait for either a command to send or a message from the server, which would be a
            // state change notification.
            tokio::select! {
                response = state.connection.receive() => {
                    handle_idle_response(&mut state, response).await?;
                }
                command = state.commands.recv() => {
                    handle_command(&mut state, command).await?;
                }
            }
        }
        LoopState::WaitingForCommandReply(responder) => {
            // We're waiting for the response to the command associated with `responder`.

            let response = state.connection.receive().await.transpose().ok_or(())?;
            trace!("response to command received");

            let _ = responder.send(response.map_err(Into::into));

            let next_command = timeout(NEXT_COMMAND_IDLE_TIMEOUT, state.commands.recv());

            // See if we can immediately send the next command
            match next_command.await {
                Ok(Some((command, responder))) => {
                    trace!(?command, "next command immediately available");
                    match state.connection.send_list(command).await {
                        Ok(_) => state.loop_state = LoopState::WaitingForCommandReply(responder),
                        Err(e) => {
                            error!(error = ?e, "failed to send command");
                            let _ = responder.send(Err(e.into()));
                            return Err(());
                        }
                    }
                }
                Ok(None) => return Err(()),
                Err(_) => {
                    trace!("reached next command timeout, idling");

                    // Start idling again
                    state.loop_state = LoopState::Idling;
                    if let Err(e) = state.connection.send(idle()).await {
                        error!(error = ?e, "failed to start idling after receiving command response");
                        let _ = state
                            .events
                            .send(ConnectionEvent::ConnectionClosed(e.into()));
                        return Err(());
                    }
                }
            }
        }
    }

    Ok(state)
}

async fn handle_command<C>(
    state: &mut State<C>,
    command: Option<(RawCommandList, CommandResponder)>,
) -> Result<(), ()>
where
    C: AsyncRead + AsyncWrite + Unpin,
{
    let (command, responder) = command.ok_or(())?;
    trace!(?command, "command received");

    // Cancel currently ongoing idle
    if let Err(e) = state.connection.send(cancel_idle()).await {
        error!(error = ?e, "failed to cancel idle prior to sending command");
        let _ = responder.send(Err(e.into()));
        return Err(());
    }

    // Receive the response to the cancellation
    match state.connection.receive().await {
        Ok(None) => return Err(()),
        Ok(Some(res)) => match res.into_single_frame() {
            Ok(f) => {
                if let Some(subsystem) = Subsystem::from_frame(f) {
                    debug!(?subsystem, "state change");
                    let _ = state
                        .events
                        .send(ConnectionEvent::SubsystemChange(subsystem));
                }
            }
            Err(e) => {
                error!(
                    code = e.code,
                    message = e.message,
                    "idle cancel returned an error"
                );
                let _ = state.events.send(ConnectionEvent::ConnectionClosed(
                    ConnectionError::InvalidResponse,
                ));
                return Err(());
            }
        },
        Err(e) => {
            error!(error = ?e, "state change error prior to sending command");
            let _ = responder.send(Err(e.into()));
            return Err(());
        }
    }

    // Actually send the command. This sets the state for the next loop
    // iteration.
    match state.connection.send_list(command).await {
        Ok(_) => state.loop_state = LoopState::WaitingForCommandReply(responder),
        Err(e) => {
            error!(error = ?e, "failed to send command");
            let _ = responder.send(Err(e.into()));
            return Err(());
        }
    }

    trace!("command sent successfully");
    Ok(())
}

async fn handle_idle_response<C>(
    state: &mut State<C>,
    response: Result<Option<Response>, MpdProtocolError>,
) -> Result<(), ()>
where
    C: AsyncRead + AsyncWrite + Unpin,
{
    trace!("handling idle response");

    match response {
        Ok(Some(res)) => {
            match res.into_single_frame() {
                Ok(f) => {
                    if let Some(subsystem) = Subsystem::from_frame(f) {
                        debug!(?subsystem, "state change");
                        let _ = state
                            .events
                            .send(ConnectionEvent::SubsystemChange(subsystem));
                    }
                }
                Err(e) => {
                    error!(code = e.code, message = e.message, "idle returned an error");
                    let _ = state.events.send(ConnectionEvent::ConnectionClosed(
                        ConnectionError::InvalidResponse,
                    ));
                    return Err(());
                }
            }

            if let Err(e) = state.connection.send(idle()).await {
                error!(error = ?e, "failed to start idling after state change");
                let _ = state
                    .events
                    .send(ConnectionEvent::ConnectionClosed(e.into()));
                return Err(());
            }
        }
        Ok(None) => return Err(()), // The connection was closed
        Err(e) => {
            error!(error = ?e, "state change error");
            let _ = state
                .events
                .send(ConnectionEvent::ConnectionClosed(e.into()));
            return Err(());
        }
    }

    Ok(())
}