llmp_test/
main.rs

1/*!
2This shows how llmp can be used directly, without libafl abstractions
3*/
4extern crate alloc;
5
6use core::marker::PhantomData;
7#[cfg(all(feature = "std", not(target_os = "haiku")))]
8use core::num::NonZeroUsize;
9#[cfg(not(target_os = "haiku"))]
10use core::time::Duration;
11#[cfg(all(feature = "std", not(target_os = "haiku")))]
12use std::{thread, time};
13
14#[cfg(all(feature = "std", not(target_os = "haiku")))]
15use libafl_core::{ClientId, Error};
16#[cfg(all(feature = "std", not(target_os = "haiku")))]
17use ll_mp::{self, Flags, LlmpHook, Tag};
18use ll_mp::{LlmpBrokerInner, LlmpMsgHookResult};
19#[cfg(all(feature = "std", not(target_os = "haiku")))]
20use shmem_providers::{ShMemProvider, StdShMemProvider};
21use tuple_list::tuple_list;
22
23#[cfg(all(feature = "std", not(target_os = "haiku")))]
24const _TAG_SIMPLE_U32_V1: Tag = Tag(0x5130_0321);
25#[cfg(all(feature = "std", not(target_os = "haiku")))]
26const _TAG_MATH_RESULT_V1: Tag = Tag(0x7747_4331);
27#[cfg(all(feature = "std", not(target_os = "haiku")))]
28const _TAG_1MEG_V1: Tag = Tag(0xB111_1161);
29
30/// The time the broker will wait for things to happen before printing a message
31#[cfg(all(feature = "std", not(target_os = "haiku")))]
32const BROKER_TIMEOUT: Duration = Duration::from_secs(10);
33
34/// How long the broker may sleep between forwarding a new chunk of sent messages
35#[cfg(all(feature = "std", not(target_os = "haiku")))]
36const SLEEP_BETWEEN_FORWARDS: Duration = Duration::from_millis(5);
37
38#[cfg(all(feature = "std", not(target_os = "haiku")))]
39fn adder_loop(port: u16) -> Result<(), Box<dyn core::error::Error>> {
40    let shmem_provider = StdShMemProvider::new()?;
41    let mut client = ll_mp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
42    let mut last_result: u32 = 0;
43    let mut current_result: u32 = 0;
44    loop {
45        let mut msg_counter = 0;
46        loop {
47            let Some((sender, tag, buf)) = client.recv_buf()? else {
48                break;
49            };
50            msg_counter += 1;
51            match tag {
52                _TAG_SIMPLE_U32_V1 => {
53                    current_result =
54                        current_result.wrapping_add(u32::from_le_bytes(buf.try_into()?));
55                }
56                _ => println!(
57                    "Adder Client ignored unknown message {:?} from client {:?} with {} bytes",
58                    tag,
59                    sender,
60                    buf.len()
61                ),
62            }
63        }
64
65        if current_result != last_result {
66            println!("Adder handled {msg_counter} messages, reporting {current_result} to broker");
67
68            client.send_buf(_TAG_MATH_RESULT_V1, &current_result.to_le_bytes())?;
69            last_result = current_result;
70        }
71
72        thread::sleep(time::Duration::from_millis(100));
73    }
74}
75
76#[cfg(all(feature = "std", not(target_os = "haiku")))]
77fn large_msg_loop(port: u16) -> Result<(), Box<dyn core::error::Error>> {
78    let mut client = ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
79
80    #[cfg(not(target_vendor = "apple"))]
81    let meg_buf = vec![1u8; 1 << 20];
82    #[cfg(target_vendor = "apple")]
83    let meg_buf = vec![1u8; 1 << 19];
84
85    loop {
86        client.send_buf(_TAG_1MEG_V1, &meg_buf)?;
87        #[cfg(not(target_vendor = "apple"))]
88        println!("Sending the next megabyte");
89        #[cfg(target_vendor = "apple")]
90        println!("Sending the next half megabyte (Apple had issues with >1 meg)");
91        thread::sleep(time::Duration::from_millis(100));
92    }
93}
94
95pub struct LlmpExampleHook<SP> {
96    phantom: PhantomData<SP>,
97}
98
99impl<SP> LlmpExampleHook<SP> {
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            phantom: PhantomData,
104        }
105    }
106}
107
108impl<SP> Default for LlmpExampleHook<SP> {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114#[cfg(all(feature = "std", not(target_os = "haiku")))]
115impl<SHM, SP> LlmpHook<SHM, SP> for LlmpExampleHook<SP>
116where
117    SP: ShMemProvider<ShMem = SHM> + 'static,
118{
119    fn on_new_message(
120        &mut self,
121        _broker_inner: &mut LlmpBrokerInner<SHM, SP>,
122        client_id: ClientId,
123        msg_tag: &mut Tag,
124        _msg_flags: &mut Flags,
125        msg: &mut [u8],
126        _new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
127    ) -> Result<LlmpMsgHookResult, Error> {
128        match *msg_tag {
129            _TAG_SIMPLE_U32_V1 => {
130                println!(
131                    "Client {:?} sent message: {:?}",
132                    client_id,
133                    u32::from_le_bytes(msg.try_into()?)
134                );
135                Ok(LlmpMsgHookResult::ForwardToClients)
136            }
137            _TAG_MATH_RESULT_V1 => {
138                println!(
139                    "Adder Client has this current result: {:?}",
140                    u32::from_le_bytes(msg.try_into()?)
141                );
142                Ok(LlmpMsgHookResult::Handled)
143            }
144            _ => {
145                println!("Unknown message id received: {msg_tag:?}");
146                Ok(LlmpMsgHookResult::ForwardToClients)
147            }
148        }
149    }
150
151    fn on_timeout(&mut self) -> Result<(), Error> {
152        println!(
153            "No client did anything for {} seconds..",
154            BROKER_TIMEOUT.as_secs()
155        );
156
157        Ok(())
158    }
159}
160
161#[cfg(target_os = "haiku")]
162fn main() {
163    eprintln!("LLMP example is currently not supported on no_std. Implement ShMem for no_std.");
164}
165
166#[cfg(not(target_os = "haiku"))]
167fn main() -> Result<(), Box<dyn core::error::Error>> {
168    /* The main node has a broker, and a few worker threads */
169
170    use ll_mp::Broker;
171
172    let mode = std::env::args()
173        .nth(1)
174        .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', 'large', or 'exiting'");
175    let port: u16 = std::env::args()
176        .nth(2)
177        .unwrap_or_else(|| "1337".into())
178        .parse::<u16>()?;
179    // in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup.
180    let b2b_port: u16 = std::env::args()
181        .nth(3)
182        .unwrap_or_else(|| "4242".into())
183        .parse::<u16>()?;
184
185    // log::set_logger(..)
186    log::set_max_level(log::LevelFilter::Trace);
187    println!("Launching in mode {mode} on port {port}");
188
189    match mode.as_str() {
190        "broker" => {
191            let mut broker = ll_mp::LlmpBroker::new(
192                StdShMemProvider::new()?,
193                tuple_list!(LlmpExampleHook::new()),
194            )?;
195            broker.inner_mut().launch_tcp_listener_on(port)?;
196            // Exit when we got at least _n_ nodes, and all of them quit.
197            broker.set_exit_after(NonZeroUsize::new(1_usize).unwrap());
198            broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
199        }
200        "b2b" => {
201            let mut broker = ll_mp::LlmpBroker::new(
202                StdShMemProvider::new()?,
203                tuple_list!(LlmpExampleHook::new()),
204            )?;
205            broker.inner_mut().launch_tcp_listener_on(b2b_port)?;
206            // connect back to the main broker.
207            broker.inner_mut().connect_b2b(("127.0.0.1", port))?;
208            broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
209        }
210        "ctr" => {
211            let mut client =
212                ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
213            let mut counter: u32 = 0;
214            loop {
215                counter = counter.wrapping_add(1);
216                client.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())?;
217                println!("CTR Client writing {counter}");
218                thread::sleep(Duration::from_secs(1));
219            }
220        }
221        "adder" => {
222            adder_loop(port)?;
223        }
224        "large" => {
225            large_msg_loop(port)?;
226        }
227        "exiting" => {
228            let mut client =
229                ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
230            for i in 0..10_u32 {
231                client.send_buf(_TAG_SIMPLE_U32_V1, &i.to_le_bytes())?;
232                println!("Exiting Client writing {i}");
233                thread::sleep(Duration::from_millis(10));
234            }
235            log::info!("Exiting Client exits");
236            client.sender_mut().send_exiting()?;
237
238            // there is another way to tell that this client wants to exit.
239            // one is to call client.sender_mut().send_exiting()?;
240            // you can disconnet the client in this way as long as this client in an unrecoverable state (like in a crash handler)
241            // another way to do this is through the detach_from_broker() call
242            // you can call detach_from_broker(port); to notify the broker that this broker wants to exit
243            // This one is usually for the event restarter to cut off the connection when the client has crashed.
244            // In that case we don't have access to the llmp client of the client anymore, but we can use detach_from_broker instead
245        }
246        _ => {
247            println!("No valid mode supplied");
248        }
249    }
250    Ok(())
251}