pub struct LlmpSender<SHM, SP> { /* private fields */ }Expand description
Sending end on a (unidirectional) sharedmap channel
Implementations§
Source§impl<SHM, SP> LlmpSender<SHM, SP>where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
An actor on the sending part of the shared map
impl<SHM, SP> LlmpSender<SHM, SP>where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
An actor on the sending part of the shared map
Sourcepub fn new(
shmem_provider: SP,
id: ClientId,
keep_pages_forever: bool,
) -> Result<Self, Error>
pub fn new( shmem_provider: SP, id: ClientId, keep_pages_forever: bool, ) -> Result<Self, Error>
Create a new LlmpSender using a given ShMemProvider, and id.
If keep_pages_forever is true, ShMem will never be freed.
If it is false, the pages will be unmapped once they are full, and have been mapped by at least one LlmpReceiver.
Sourcepub fn on_existing_from_env(
shmem_provider: SP,
env_name: &str,
) -> Result<Self, Error>
pub fn on_existing_from_env( shmem_provider: SP, env_name: &str, ) -> Result<Self, Error>
Reattach to a vacant out_shmem, to with a previous sender stored the information in an env before.
Sourcepub fn on_existing_shmem(
shmem_provider: SP,
current_out_shmem: SHM,
last_msg_sent_offset: Option<u64>,
) -> Result<Self, Error>
pub fn on_existing_shmem( shmem_provider: SP, current_out_shmem: SHM, last_msg_sent_offset: Option<u64>, ) -> Result<Self, Error>
Reattach to a vacant out_shmem.
It is essential, that the receiver (or someone else) keeps a pointer to this map
else reattach will get a new, empty page, from the OS, or fail.
Sourcepub fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error>
pub fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error>
Allocates the next space on this sender page
Sourcepub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg)
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg)
Cancel send of the next message, this allows us to allocate a new message without sending this one.
§Safety
They msg pointer may no longer be used after cancel_send
Sourcepub unsafe fn shrink_alloced(
&mut self,
msg: *mut LlmpMsg,
shrinked_len: usize,
) -> Result<(), Error>
pub unsafe fn shrink_alloced( &mut self, msg: *mut LlmpMsg, shrinked_len: usize, ) -> Result<(), Error>
Sourcepub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error>
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error>
Allocates a message of the given size, tags it, and sends it off.
Sourcepub fn send_buf_with_flags(
&mut self,
tag: Tag,
flags: Flags,
buf: &[u8],
) -> Result<(), Error>
pub fn send_buf_with_flags( &mut self, tag: Tag, flags: Flags, buf: &[u8], ) -> Result<(), Error>
Send a buf with the given flags.
Sourcepub fn on_existing_from_description(
shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error>
pub fn on_existing_from_description( shmem_provider: SP, description: &LlmpDescription, ) -> Result<Self, Error>
Create this client on an existing map from the given description.
Acquired with LlmpSender::describe.
Sourcepub fn send_exiting(&mut self) -> Result<(), Error>
pub fn send_exiting(&mut self) -> Result<(), Error>
Send information that this client is exiting. The other side may free up all allocated memory. We are no longer allowed to send anything afterwards.
Examples found in repository?
167fn main() -> Result<(), Box<dyn core::error::Error>> {
168 /* The main node has a broker, and a few worker threads */
169
170 use ll_mp::Broker;
171
172 let mode = std::env::args()
173 .nth(1)
174 .expect("no mode specified, chose 'broker', 'b2b', 'ctr', 'adder', 'large', or 'exiting'");
175 let port: u16 = std::env::args()
176 .nth(2)
177 .unwrap_or_else(|| "1337".into())
178 .parse::<u16>()?;
179 // in the b2b use-case, this is our "own" port, we connect to the "normal" broker node on startup.
180 let b2b_port: u16 = std::env::args()
181 .nth(3)
182 .unwrap_or_else(|| "4242".into())
183 .parse::<u16>()?;
184
185 // log::set_logger(..)
186 log::set_max_level(log::LevelFilter::Trace);
187 println!("Launching in mode {mode} on port {port}");
188
189 match mode.as_str() {
190 "broker" => {
191 let mut broker = ll_mp::LlmpBroker::new(
192 StdShMemProvider::new()?,
193 tuple_list!(LlmpExampleHook::new()),
194 )?;
195 broker.inner_mut().launch_tcp_listener_on(port)?;
196 // Exit when we got at least _n_ nodes, and all of them quit.
197 broker.set_exit_after(NonZeroUsize::new(1_usize).unwrap());
198 broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
199 }
200 "b2b" => {
201 let mut broker = ll_mp::LlmpBroker::new(
202 StdShMemProvider::new()?,
203 tuple_list!(LlmpExampleHook::new()),
204 )?;
205 broker.inner_mut().launch_tcp_listener_on(b2b_port)?;
206 // connect back to the main broker.
207 broker.inner_mut().connect_b2b(("127.0.0.1", port))?;
208 broker.loop_with_timeouts(BROKER_TIMEOUT, Some(SLEEP_BETWEEN_FORWARDS));
209 }
210 "ctr" => {
211 let mut client =
212 ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
213 let mut counter: u32 = 0;
214 loop {
215 counter = counter.wrapping_add(1);
216 client.send_buf(_TAG_SIMPLE_U32_V1, &counter.to_le_bytes())?;
217 println!("CTR Client writing {counter}");
218 thread::sleep(Duration::from_secs(1));
219 }
220 }
221 "adder" => {
222 adder_loop(port)?;
223 }
224 "large" => {
225 large_msg_loop(port)?;
226 }
227 "exiting" => {
228 let mut client =
229 ll_mp::LlmpClient::create_attach_to_tcp(StdShMemProvider::new()?, port)?;
230 for i in 0..10_u32 {
231 client.send_buf(_TAG_SIMPLE_U32_V1, &i.to_le_bytes())?;
232 println!("Exiting Client writing {i}");
233 thread::sleep(Duration::from_millis(10));
234 }
235 log::info!("Exiting Client exits");
236 client.sender_mut().send_exiting()?;
237
238 // there is another way to tell that this client wants to exit.
239 // one is to call client.sender_mut().send_exiting()?;
240 // you can disconnet the client in this way as long as this client in an unrecoverable state (like in a crash handler)
241 // another way to do this is through the detach_from_broker() call
242 // you can call detach_from_broker(port); to notify the broker that this broker wants to exit
243 // This one is usually for the event restarter to cut off the connection when the client has crashed.
244 // In that case we don't have access to the llmp client of the client anymore, but we can use detach_from_broker instead
245 }
246 _ => {
247 println!("No valid mode supplied");
248 }
249 }
250 Ok(())
251}Source§impl<SHM, SP> LlmpSender<SHM, SP>where
SHM: ShMem,
impl<SHM, SP> LlmpSender<SHM, SP>where
SHM: ShMem,
Sourcepub unsafe fn reset(&mut self)
pub unsafe fn reset(&mut self)
Completely reset the current sender map. Afterwards, no receiver should read from it at a different location. This is only useful if all connected llmp parties start over, for example after a crash.
§Safety
Only safe if you really really restart the page on everything connected No receiver should read from this page at a different location.
Sourcepub unsafe fn to_env(&self, env_name: &str) -> Result<(), Error>
pub unsafe fn to_env(&self, env_name: &str) -> Result<(), Error>
Store the info to this sender to env.
A new client can reattach to it using LlmpSender::on_existing_from_env().
§Safety
Writes to env variables and may only be done single-threaded.
Sourcepub fn await_safe_to_unmap_blocking(&self)
pub fn await_safe_to_unmap_blocking(&self)
Waits for this sender to be save to unmap. If a receiver is involved, this function should always be called.
Sourcepub fn safe_to_unmap(&self) -> bool
pub fn safe_to_unmap(&self) -> bool
If we are allowed to unmap this client
Sourcepub unsafe fn mark_safe_to_unmap(&mut self)
pub unsafe fn mark_safe_to_unmap(&mut self)
For debug purposes: Mark save to unmap, even though it might not have been read by a receiver yet.
§Safety
If this method is called, the page may be unmapped before it is read by any receiver.
Sourcepub fn describe(&self) -> Result<LlmpDescription, Error>
pub fn describe(&self) -> Result<LlmpDescription, Error>
Describe this LlmpClient in a way that it can be restored later, using Self::on_existing_from_description.