1extern crate alloc;
5
6use core::marker::PhantomData;
7#[cfg(all(feature = "std", not(target_os = "haiku")))]
8use core::num::NonZeroUsize;
9#[cfg(not(target_os = "haiku"))]
10use core::time::Duration;
11#[cfg(all(feature = "std", not(target_os = "haiku")))]
12use std::{thread, time};
13
14#[cfg(all(feature = "std", not(target_os = "haiku")))]
15use libafl_core::{ClientId, Error};
16#[cfg(all(feature = "std", not(target_os = "haiku")))]
17use ll_mp::{self, Flags, LlmpHook, Tag};
18use ll_mp::{LlmpBrokerInner, LlmpMsgHookResult};
19#[cfg(all(feature = "std", not(target_os = "haiku")))]
20use shmem_providers::{ShMemProvider, StdShMemProvider};
21use tuple_list::tuple_list;
22
23#[cfg(all(feature = "std", not(target_os = "haiku")))]
24const _TAG_SIMPLE_U32_V1: Tag = Tag(0x5130_0321);
25#[cfg(all(feature = "std", not(target_os = "haiku")))]
26const _TAG_MATH_RESULT_V1: Tag = Tag(0x7747_4331);
27#[cfg(all(feature = "std", not(target_os = "haiku")))]
28const _TAG_1MEG_V1: Tag = Tag(0xB111_1161);
29
30#[cfg(all(feature = "std", not(target_os = "haiku")))]
32const BROKER_TIMEOUT: Duration = Duration::from_secs(10);
33
34#[cfg(all(feature = "std", not(target_os = "haiku")))]
36const SLEEP_BETWEEN_FORWARDS: Duration = Duration::from_millis(5);
37
38#[cfg(all(feature = "std", not(target_os = "haiku")))]
39fn adder_loop(port: u16) -> Result<(), Box<dyn core::error::Error>> {
40 let shmem_provider = StdShMemProvider::new()?;
41 let mut client = ll_mp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
42 let mut last_result: u32 = 0;
43 let mut current_result: u32 = 0;
44 loop {
45 let mut msg_counter = 0;
46 loop {
47 let Some((sender, tag, buf)) = client.recv_buf()? else {
48 break;
49 };
50 msg_counter += 1;
51 match tag {
52 _TAG_SIMPLE_U32_V1 => {
53 current_result =
54 current_result.wrapping_add(u32::from_le_bytes(buf.try_into()?));
55 }
56 _ => println!(
57 "Adder Client ignored unknown message {:?} from client {:?} with {} bytes",
58 tag,
59 sender,
60 buf.len()
61 ),
62 }
63 }
64
65 if current_result != last_result {
66 println!("Adder handled {msg_counter} messages, reporting {current_result} to broker");
67
68 client.send_buf(_TAG_MATH_RESULT_V1, ¤t_result.to_le_bytes())?;
69 last_result = current_result;
70 }
71
72 thread::sleep(time::Duration::from_millis(100));
73 }
74}
75
76#[cfg(all(feature = "std", not(target_os = "haiku")))]
77fn large_msg_loop(port: u16) -> Result<(), Box<dyn core::error::Error>> {
78 let mut client = ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
79
80 #[cfg(not(target_vendor = "apple"))]
81 let meg_buf = vec![1u8; 1 << 20];
82 #[cfg(target_vendor = "apple")]
83 let meg_buf = vec![1u8; 1 << 19];
84
85 loop {
86 client.send_buf(_TAG_1MEG_V1, &meg_buf)?;
87 #[cfg(not(target_vendor = "apple"))]
88 println!("Sending the next megabyte");
89 #[cfg(target_vendor = "apple")]
90 println!("Sending the next half megabyte (Apple had issues with >1 meg)");
91 thread::sleep(time::Duration::from_millis(100));
92 }
93}
94
95pub struct LlmpExampleHook<SP> {
96 phantom: PhantomData<SP>,
97}
98
99impl<SP> LlmpExampleHook<SP> {
100 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 phantom: PhantomData,
104 }
105 }
106}
107
108impl<SP> Default for LlmpExampleHook<SP> {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114#[cfg(all(feature = "std", not(target_os = "haiku")))]
115impl<SHM, SP> LlmpHook<SHM, SP> for LlmpExampleHook<SP>
116where
117 SP: ShMemProvider<ShMem = SHM> + 'static,
118{
119 fn on_new_message(
120 &mut self,
121 _broker_inner: &mut LlmpBrokerInner<SHM, SP>,
122 client_id: ClientId,
123 msg_tag: &mut Tag,
124 _msg_flags: &mut Flags,
125 msg: &mut [u8],
126 _new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
127 ) -> Result<LlmpMsgHookResult, Error> {
128 match *msg_tag {
129 _TAG_SIMPLE_U32_V1 => {
130 println!(
131 "Client {:?} sent message: {:?}",
132 client_id,
133 u32::from_le_bytes(msg.try_into()?)
134 );
135 Ok(LlmpMsgHookResult::ForwardToClients)
136 }
137 _TAG_MATH_RESULT_V1 => {
138 println!(
139 "Adder Client has this current result: {:?}",
140 u32::from_le_bytes(msg.try_into()?)
141 );
142 Ok(LlmpMsgHookResult::Handled)
143 }
144 _ => {
145 println!("Unknown message id received: {msg_tag:?}");
146 Ok(LlmpMsgHookResult::ForwardToClients)
147 }
148 }
149 }
150
151 fn on_timeout(&mut self) -> Result<(), Error> {
152 println!(
153 "No client did anything for {} seconds..",
154 BROKER_TIMEOUT.as_secs()
155 );
156
157 Ok(())
158 }
159}
160
161#[cfg(target_os = "haiku")]
162fn main() {
163 eprintln!("LLMP example is currently not supported on no_std. Implement ShMem for no_std.");
164}
165
166#[cfg(not(target_os = "haiku"))]
167fn main() -> Result<(), Box<dyn core::error::Error>> {
168 use ll_mp::Broker;
171
172 let mode = std::env::args()
173 .nth(1)
174 .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', 'large', or 'exiting'");
175 let port: u16 = std::env::args()
176 .nth(2)
177 .unwrap_or_else(|| "1337".into())
178 .parse::<u16>()?;
179 let b2b_port: u16 = std::env::args()
181 .nth(3)
182 .unwrap_or_else(|| "4242".into())
183 .parse::<u16>()?;
184
185 log::set_max_level(log::LevelFilter::Trace);
187 println!("Launching in mode {mode} on port {port}");
188
189 match mode.as_str() {
190 "broker" => {
191 let mut broker = ll_mp::LlmpBroker::new(
192 StdShMemProvider::new()?,
193 tuple_list!(LlmpExampleHook::new()),
194 )?;
195 broker.inner_mut().launch_tcp_listener_on(port)?;
196 broker.set_exit_after(NonZeroUsize::new(1_usize).unwrap());
198 broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
199 }
200 "b2b" => {
201 let mut broker = ll_mp::LlmpBroker::new(
202 StdShMemProvider::new()?,
203 tuple_list!(LlmpExampleHook::new()),
204 )?;
205 broker.inner_mut().launch_tcp_listener_on(b2b_port)?;
206 broker.inner_mut().connect_b2b(("127.0.0.1", port))?;
208 broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
209 }
210 "ctr" => {
211 let mut client =
212 ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
213 let mut counter: u32 = 0;
214 loop {
215 counter = counter.wrapping_add(1);
216 client.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())?;
217 println!("CTR Client writing {counter}");
218 thread::sleep(Duration::from_secs(1));
219 }
220 }
221 "adder" => {
222 adder_loop(port)?;
223 }
224 "large" => {
225 large_msg_loop(port)?;
226 }
227 "exiting" => {
228 let mut client =
229 ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
230 for i in 0..10_u32 {
231 client.send_buf(_TAG_SIMPLE_U32_V1, &i.to_le_bytes())?;
232 println!("Exiting Client writing {i}");
233 thread::sleep(Duration::from_millis(10));
234 }
235 log::info!("Exiting Client exits");
236 client.sender_mut().send_exiting()?;
237
238 }
246 _ => {
247 println!("No valid mode supplied");
248 }
249 }
250 Ok(())
251}