crowdstrike_cloudproto/services/lfo/
client.rs

1use crate::framing::{CloudProtoError, CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
2use crate::services::lfo::pkt_kind::LfoPacketKind;
3use crate::services::lfo::request::LfoRequest;
4use crate::services::lfo::{LfoError, LfoResponse};
5use crate::services::CloudProtoMagic;
6use futures_util::{SinkExt, StreamExt};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tracing::trace;
9
10/// Request files stored on an LFO file server.
11pub struct LfoClient<IO: AsyncRead + AsyncWrite> {
12    sock: CloudProtoSocket<IO>,
13}
14
15impl<IO> LfoClient<IO>
16where
17    IO: AsyncRead + AsyncWrite,
18{
19    pub fn new(sock: CloudProtoSocket<IO>) -> Self {
20        Self { sock }
21    }
22
23    /// Download the file at the remote path specified in the [`LfoRequest`](super::LfoRequest).
24    pub async fn get(&mut self, request: &LfoRequest) -> Result<LfoResponse, LfoError> {
25        let payload = request.to_payload();
26        trace!("Sending LFO request payload: {}", hex::encode(&payload));
27        let req_pkt = CloudProtoPacket {
28            magic: CloudProtoMagic::LFO,
29            kind: LfoPacketKind::GetFileRequest.into(),
30            version: CloudProtoVersion::Connect,
31            payload,
32        };
33        self.sock.send(req_pkt).await?;
34
35        if let Some(reply) = self.sock.next().await {
36            Ok(reply?.try_into()?)
37        } else {
38            Err(LfoError::CloudProto(CloudProtoError::ClosedByPeer(
39                "LFO server closed connection".to_owned(),
40            )))
41        }
42    }
43}
44
45#[cfg(test)]
46mod test {
47    use crate::framing::{CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
48    use crate::services::lfo::pkt_kind::LfoPacketKind;
49    use crate::services::lfo::test::TEST_REPLY_DATA;
50    use crate::services::lfo::{LfoClient, LfoError, LfoRequest};
51    use crate::services::CloudProtoMagic;
52    use futures_util::{SinkExt, StreamExt};
53    use tokio::spawn;
54
55    #[test_log::test(tokio::test)]
56    async fn simple_mock_request() -> Result<(), LfoError> {
57        let (client, server) = tokio::io::duplex(16 * 1024);
58        let mut client = LfoClient::new(CloudProtoSocket::new(client));
59        let mut server = CloudProtoSocket::new(server);
60
61        let req_path = "/test/foo".to_string();
62        let req = LfoRequest::new_simple(req_path.clone());
63
64        let server_task = spawn(async move {
65            let req = server.next().await.unwrap()?;
66            assert_eq!(req.magic, CloudProtoMagic::LFO);
67            assert_eq!(req.version, CloudProtoVersion::Connect);
68            assert_eq!(req.kind, LfoPacketKind::GetFileRequest);
69            let req = LfoRequest::try_from_payload(&req.payload)?;
70            assert_eq!(&req.remote_path, &req_path);
71
72            server
73                .send(CloudProtoPacket {
74                    magic: CloudProtoMagic::LFO,
75                    kind: LfoPacketKind::ReplyOk.into(),
76                    version: CloudProtoVersion::Normal,
77                    payload: hex::decode(TEST_REPLY_DATA).unwrap(),
78                })
79                .await?;
80            Ok::<(), LfoError>(())
81        });
82        let reply = client.get(&req).await?;
83        assert_eq!(hex::encode(reply.raw_lfo_payload()), TEST_REPLY_DATA);
84
85        server_task.await.unwrap()?;
86        Ok(())
87    }
88}