1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
//! # kik_sync_service //! //! A synchronous work system for dealing with small inputs that lead to heavy workloads. //! //! //! ## Why Use This //! //! This "*Message* + *worker* thread" system was built for the following situations: //! //! - When there is a small sample of inputs that can generate a very large amount of data. //! //! - When a small input can generate a very heavy workload. //! //! - When there is no need to store the data generated in the memory. //! //! - When a system must process a stream of data and send it directly somewhere else. //! //! - I.E. sound processing, image processing, handling network requests, etc. //! //! - When generating individual frames for some GUI. //! //! //! The system consists of several *Worker*s and a single thread (*FeederRecycler*) sending and retrieving messages. If the (work time)/message is high enough, //! this single thread handling input/output might have enough time to store data in the memory without slowing down the workers. //! It's up to the creativity of each user. //! //! # Example //! //! This is huge because we need to first create a struct that implements *MessageData*, then another for *MessageInput*, then another for *Message*. //! //! But what kind of data and how it's worked is all up to the user. Just create the *DeliveryService* channel, //! use default values or a custom *ChannelConfig* argument. Then create a vector of the input data you need. //! Feed that data into the *DeliveryService* by calling "*feed_feeder*" method. As soon as data starts being fed *Worker*s will work. //! Finally start getting the results with a "for" call to a mutable reference of the channel. //! Repeat feed and iterate as many times as needed. //! //! //! use kik_sync_service::message::{Message, MessageData, MessageInput}; //! use kik_sync_service::channel::{DeliveryService}; //! //! //! // What type of data should be returned. //! pub struct MessageArray{ //! data: [u32; 1024], //! } //! //! impl Clone for MessageArray{ //! fn clone(&self) -> Self{ //! let mut new_array: [u32; 1024] = [0; 1024]; //! for i in 0..1024{ //! new_array[i] = self.data[i]; //! } //! MessageArray{ //! data: new_array, //! } //! } //! } //! //! // with this trait it can be used as data for a Message. //! impl MessageData for MessageArray{ //! fn new() -> Self{ //! MessageArray{ //! data: [0; 1024], //! } //! } //! } //! //! impl MessageArray{ //! pub fn get(&mut self) -> &mut [u32; 1024]{ //! &mut self.data //! } //! } //! //! //! // What kind of input it needs. //! pub struct Coordinates{ //! pub x0: usize, //! pub y0: usize, //! pub x1: usize, //! pub y1: usize, //! } //! //! // Doesn't need to implement Copy, but needs Clone. //! impl Clone for Coordinates{ //! fn clone(&self) -> Self{ //! Coordinates{ //! x0: self.x0, //! y0: self.y0, //! x1: self.x1, //! y1: self.y1, //! } //! } //! } //! //! // This implementation tells the compiler that this object can be //! // used as input for the worker threads, and it can only work with MessageArray. //! impl MessageInput<MessageArray> for Coordinates{ //! fn new() -> Self{ //! Coordinates{ //! x0: 0, //! y0: 0, //! x1: 0, //! y1: 0, //! } //! } //! } //! //! //! // This is the message that holds both the data and input. //! // Feel free to add anything else you might need to work with it. //! pub struct ThreadMessage{ //! pub array: MessageArray, //! pub current_input: Coordinates, //! } //! //! impl Clone for ThreadMessage{ //! fn clone(&self) -> Self{ //! ThreadMessage{ //! array: self.array.clone(), //! current_input: self.current_input.clone(), //! } //! } //! } //! //! // ThreadMessage uses MessageArray as data, //! // ThreadMessage uses Coordinates as input to change the data. //! impl Message<MessageArray, Coordinates> for ThreadMessage { //! //! fn set_input(&mut self, message_input: Coordinates){ //! self.current_input = message_input.clone(); //! } //! //! fn work (&mut self){ //! let (x0, y0, x1, y1) = ( //! self.current_input.x0, //! self.current_input.y0, //! self.current_input.x1, //! self.current_input.y1, //! ); //! //! let array = self.array.get(); //! let mut counter: usize = 0; //! //! // Not very creative right now, each operation will count from 0 to 1024. //! // This is just to show that the results are being made and returned. I'll use it to generate fractals in the next project. //! // I'm thankful for anyone willing to offer a better example for this later. //! // Counting in the first line will go like 0 1 2 3 ... 30 31 1 2 3 ... //! // Counting in the second line will go like 32 33 34 35 ... 59 60 61 62 63 ... //! // And so forth until 1023 in the last line. //! for _y in (y0)..(y1){ //! for _x in (x0)..(x1){ //! let value = counter; //! array[counter] = value as u32; //! //! counter = counter + 1; //! } //! } //! } //! //! fn clone_message_data(&self) -> MessageArray{ //! self.array.clone() //! } //! //! fn new() -> Self{ //! let new_data = MessageArray::new(); //! let new_input = Coordinates::new(); //! //! ThreadMessage{ //! current_input: new_input, //! array: new_data, //! } //! } //! //! } //! //! // Finally, Now that all the data structure is set, time to use the channel. //! #[test] //! fn test(){ //! let width: usize = 1024; //! let height: usize = 768; //! let mut coordinates: Vec<Coordinates> = Vec::with_capacity((height as f32/32.0 * width as f32/32.0)as usize); //! assert_eq!(width % 32, 0); //! assert_eq!(height % 32, 0); //! //! // Creating a vec of coordinates to use as input. //! // for y in 0..24 //! for y in 0..(((height as f32)/32.0) as usize){ //! // for x in 0..32 //! for x in 0..(((width as f32)/32.0) as usize){ //! let (x0, y0) = (32 * x, 32 * y); //! coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32}); //! } //! } //! // Personal Note: //! // create a vec of inputs //! // create channel //! // send the vec of inputs //! // iterate through the channel //! // print the resulting values //! //! //data is MessageArray //! //input is Coordinates //! //message is ThreadMessage //! //! // Creating a channel that uses MessageArray as MessageData, Coordinates as MessageInput, ThreadMessage as Message. Default config values have been used. //! let mut kiki_channel: DeliveryService<MessageArray,Coordinates,ThreadMessage> = DeliveryService::default(); //! kiki_channel.feed_feeder(&mut coordinates); //! //! let mut counter = 0; //! // Need to iterate through a mutable reference of kiki_channel to maintain ownership of it. //! for mut i in &mut kiki_channel{ //! let mut highest: u32 = 0; //! let message_array = i.get(); //! for j in message_array{ //! if highest < *j { //! highest = *j; //! } //! } //! // All the highest values for each line will be 31, 63, n * 32 -1, ... //! assert_eq!(highest % 32, 31); //! println!("Total line {}: {}", counter, highest); //! counter += 1; //! } //! //! // Creating another vec to feed the structure again. //! // for y in 0..24 //! for y in 0..(((height as f32)/32.0) as usize){ //! // for x in 0..32 //! for x in 0..(((width as f32)/32.0) as usize){ //! let (x0, y0) = (32 * x, 32 * y); //! coordinates.push(Coordinates{x0: x0, y0: y0, x1: x0 + 32, y1: y0 + 32}); //! } //! } //! //! // You can feed more input values after emptying the results from last run. //! kiki_channel.feed_feeder(&mut coordinates); //! //! let mut counter = 0; //! // The worker threads and feeder will only be closed when channel goes out of scope (unless they panic). //! // Need to iterate through a mutable reference of kiki_channel to maintain ownership of it. //! for mut i in &mut kiki_channel{ //! let mut highest: u32 = 0; //! let message_array = i.get(); //! for j in message_array{ //! if highest < *j { //! highest = *j; //! } //! } //! // Used this when I was testing as fn main //! // if counter % 13 == 0{ //! // println!("Total linha {}: {}", counter, total); //! // } //! //! // All the highest values for each line will be 31, 63, n * 32 -1, ... //! assert_eq!(highest % 32, 31); //! println!("Total line {}: {}", counter, highest); //! counter += 1; //! } //! } //! //! // This crate is a library #![crate_type = "lib"] // The library is named "kik_sync_service" #![crate_name = "kik_sync_service"] mod kik_message; mod kik_channel; mod kik_worker; mod kik_feeder; mod kik_message_example; /// Holds the traits used for message sharing and how to work them. They must be manually set by the user before using channel. pub mod message{ pub use crate::kik_message::{Message,MessageInput, MessageData}; } /// DeliveryService is the channel used for the synchronous message-sharing and work. It can be created with DeliveryService::default values or be customized by using ChannelConfig as argument for DeliveryService::new. pub mod channel{ pub use crate::kik_channel::{ChannelConfig, DeliveryService}; }