1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::framing::{CloudProtoError, CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
use crate::services::lfo::pkt_kind::LfoPacketKind;
use crate::services::lfo::request::LfoRequest;
use crate::services::lfo::{LfoError, LfoResponse};
use crate::services::CloudProtoMagic;
use futures_util::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::trace;

/// Request files stored on an LFO file server.
pub struct LfoClient<IO: AsyncRead + AsyncWrite> {
    sock: CloudProtoSocket<IO>,
}

impl<IO> LfoClient<IO>
where
    IO: AsyncRead + AsyncWrite,
{
    pub fn new(sock: CloudProtoSocket<IO>) -> Self {
        Self { sock }
    }

    /// Download the file at the remote path specified in the [`LfoRequest`](super::LfoRequest).
    pub async fn get(&mut self, request: &LfoRequest) -> Result<LfoResponse, LfoError> {
        let payload = request.to_payload();
        trace!("Sending LFO request payload: {}", hex::encode(&payload));
        let req_pkt = CloudProtoPacket {
            magic: CloudProtoMagic::LFO,
            kind: LfoPacketKind::GetFileRequest.into(),
            version: CloudProtoVersion::Connect,
            payload,
        };
        self.sock.send(req_pkt).await?;

        if let Some(reply) = self.sock.next().await {
            Ok(reply?.try_into()?)
        } else {
            Err(LfoError::CloudProto(CloudProtoError::ClosedByPeer(
                "LFO server closed connection".to_owned(),
            )))
        }
    }
}

#[cfg(test)]
mod test {
    use crate::framing::{CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
    use crate::services::lfo::pkt_kind::LfoPacketKind;
    use crate::services::lfo::test::TEST_REPLY_DATA;
    use crate::services::lfo::{LfoClient, LfoError, LfoRequest};
    use crate::services::CloudProtoMagic;
    use futures_util::{SinkExt, StreamExt};
    use tokio::spawn;

    #[test_log::test(tokio::test)]
    async fn simple_mock_request() -> Result<(), LfoError> {
        let (client, server) = tokio::io::duplex(16 * 1024);
        let mut client = LfoClient::new(CloudProtoSocket::new(client));
        let mut server = CloudProtoSocket::new(server);

        let req_path = "/test/foo".to_string();
        let req = LfoRequest::new_simple(req_path.clone());

        let server_task = spawn(async move {
            let req = server.next().await.unwrap()?;
            assert_eq!(req.magic, CloudProtoMagic::LFO);
            assert_eq!(req.version, CloudProtoVersion::Connect);
            assert_eq!(req.kind, LfoPacketKind::GetFileRequest);
            let req = LfoRequest::try_from_payload(&req.payload)?;
            assert_eq!(&req.remote_path, &req_path);

            server
                .send(CloudProtoPacket {
                    magic: CloudProtoMagic::LFO,
                    kind: LfoPacketKind::ReplyOk.into(),
                    version: CloudProtoVersion::Normal,
                    payload: hex::decode(TEST_REPLY_DATA).unwrap(),
                })
                .await?;
            Ok::<(), LfoError>(())
        });
        let reply = client.get(&req).await?;
        assert_eq!(hex::encode(reply.raw_lfo_payload()), TEST_REPLY_DATA);

        server_task.await.unwrap()?;
        Ok(())
    }
}