crowdstrike_cloudproto/services/lfo/
client.rs1use crate::framing::{CloudProtoError, CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
2use crate::services::lfo::pkt_kind::LfoPacketKind;
3use crate::services::lfo::request::LfoRequest;
4use crate::services::lfo::{LfoError, LfoResponse};
5use crate::services::CloudProtoMagic;
6use futures_util::{SinkExt, StreamExt};
7use tokio::io::{AsyncRead, AsyncWrite};
8use tracing::trace;
9
10pub struct LfoClient<IO: AsyncRead + AsyncWrite> {
12 sock: CloudProtoSocket<IO>,
13}
14
15impl<IO> LfoClient<IO>
16where
17 IO: AsyncRead + AsyncWrite,
18{
19 pub fn new(sock: CloudProtoSocket<IO>) -> Self {
20 Self { sock }
21 }
22
23 pub async fn get(&mut self, request: &LfoRequest) -> Result<LfoResponse, LfoError> {
25 let payload = request.to_payload();
26 trace!("Sending LFO request payload: {}", hex::encode(&payload));
27 let req_pkt = CloudProtoPacket {
28 magic: CloudProtoMagic::LFO,
29 kind: LfoPacketKind::GetFileRequest.into(),
30 version: CloudProtoVersion::Connect,
31 payload,
32 };
33 self.sock.send(req_pkt).await?;
34
35 if let Some(reply) = self.sock.next().await {
36 Ok(reply?.try_into()?)
37 } else {
38 Err(LfoError::CloudProto(CloudProtoError::ClosedByPeer(
39 "LFO server closed connection".to_owned(),
40 )))
41 }
42 }
43}
44
45#[cfg(test)]
46mod test {
47 use crate::framing::{CloudProtoPacket, CloudProtoSocket, CloudProtoVersion};
48 use crate::services::lfo::pkt_kind::LfoPacketKind;
49 use crate::services::lfo::test::TEST_REPLY_DATA;
50 use crate::services::lfo::{LfoClient, LfoError, LfoRequest};
51 use crate::services::CloudProtoMagic;
52 use futures_util::{SinkExt, StreamExt};
53 use tokio::spawn;
54
55 #[test_log::test(tokio::test)]
56 async fn simple_mock_request() -> Result<(), LfoError> {
57 let (client, server) = tokio::io::duplex(16 * 1024);
58 let mut client = LfoClient::new(CloudProtoSocket::new(client));
59 let mut server = CloudProtoSocket::new(server);
60
61 let req_path = "/test/foo".to_string();
62 let req = LfoRequest::new_simple(req_path.clone());
63
64 let server_task = spawn(async move {
65 let req = server.next().await.unwrap()?;
66 assert_eq!(req.magic, CloudProtoMagic::LFO);
67 assert_eq!(req.version, CloudProtoVersion::Connect);
68 assert_eq!(req.kind, LfoPacketKind::GetFileRequest);
69 let req = LfoRequest::try_from_payload(&req.payload)?;
70 assert_eq!(&req.remote_path, &req_path);
71
72 server
73 .send(CloudProtoPacket {
74 magic: CloudProtoMagic::LFO,
75 kind: LfoPacketKind::ReplyOk.into(),
76 version: CloudProtoVersion::Normal,
77 payload: hex::decode(TEST_REPLY_DATA).unwrap(),
78 })
79 .await?;
80 Ok::<(), LfoError>(())
81 });
82 let reply = client.get(&req).await?;
83 assert_eq!(hex::encode(reply.raw_lfo_payload()), TEST_REPLY_DATA);
84
85 server_task.await.unwrap()?;
86 Ok(())
87 }
88}