gemla 0.1.32

Using evolutionary computation to generate machine learning algorithms
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
//! This module provides the core logic for simulating genetic algorithms using a tournament bracket structure.
//!
//! - Defines the [`Gemla`] struct, which manages the simulation of populations as a tree of nodes.
//! - Nodes implement the [`GeneticNode`] trait and are managed using [`GeneticNodeWrapper`].
//! - Simulations are performed in a bracket configuration, allowing populations to compete and evolve.
//! - Includes configuration, tree management, and asynchronous simulation logic.
//!
//! [`Gemla`]: crate::core::Gemla
//! [`GeneticNode`]: crate::core::genetic_node::GeneticNode
//! [`GeneticNodeWrapper`]: crate::core::genetic_node::GeneticNodeWrapper

pub mod genetic_node;

use crate::{error::Error, tree::Tree};
use async_recursion::async_recursion;
use file_linked::{constants::data_format::DataFormat, FileLinked};
use futures::future;
use genetic_node::{GeneticNode, GeneticNodeWrapper, GeneticState};
use log::{info, trace, warn};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
    collections::HashMap, fmt::Debug, fs::File, io::ErrorKind, marker::Send, mem, path::Path,
    sync::Arc, time::Instant,
};
use tokio::{sync::RwLock, task::JoinHandle};
use uuid::Uuid;

type SimulationTree<T> = Box<Tree<GeneticNodeWrapper<T>>>;

/// Configuration options for managing a [`Gemla`] simulation.
///
/// - `overwrite`: If true, existing simulation data will be overwritten when initializing a new simulation.
#[derive(Serialize, Deserialize, Copy, Clone)]
pub struct GemlaConfig {
    /// If true, existing simulation data will be overwritten when initializing a new simulation.
    pub overwrite: bool,
}

/// Manages a tournament-style bracket for simulating and evaluating nodes of type `T` implementing [`GeneticNode`].
///
/// Nodes are organized as an unbalanced binary tree, and the simulation proceeds by processing nodes, merging results,
/// and evolving populations. The simulation is asynchronous and supports concurrent node processing.
///
/// # Type Parameters
/// - `T`: The node type, which must implement [`GeneticNode`], serialization, Debug, Send, and Clone traits.
///
/// # Fields
/// - `data`: Stores the simulation tree, configuration, and context.
/// - `threads`: Tracks asynchronous tasks for node processing.
///
/// # Example
/// ```ignore
/// let config = GemlaConfig { overwrite: true };
/// let mut gemla = Gemla::<MyNodeType>::new(path, config, DataFormat::Json).await?;
/// gemla.simulate(5).await?;
/// ```
///
/// [`GeneticNode`]: crate::core::genetic_node::GeneticNode
pub struct Gemla<T>
where
    T: GeneticNode + Serialize + DeserializeOwned + Debug + Send + Clone,
    T::Context: Send + Sync + Clone + Debug + Serialize + DeserializeOwned + 'static + Default,
{
    /// The simulation data, including the tree, configuration, and context.
    pub data: FileLinked<(Option<SimulationTree<T>>, GemlaConfig, T::Context)>,
    threads: HashMap<Uuid, JoinHandle<Result<GeneticNodeWrapper<T>, Error>>>,
}

impl<T: 'static> Gemla<T>
where
    T: GeneticNode + Serialize + DeserializeOwned + Debug + Send + Sync + Clone,
    T::Context: Send + Sync + Clone + Debug + Serialize + DeserializeOwned + 'static + Default,
{
    /// Creates a new [`Gemla`] instance, initializing or loading the simulation data from the specified path.
    ///
    /// # Arguments
    /// - `path`: The file system path to load or create the simulation data.
    /// - `config`: Configuration options for the simulation.
    /// - `data_format`: The format of the data (e.g., JSON, binary).
    ///
    /// # Returns
    /// - `Ok(Self)`: A new [`Gemla`] instance.
    /// - `Err(Error)`: An error occurred during initialization or loading.
    pub async fn new(
        path: &Path,
        config: GemlaConfig,
        data_format: DataFormat,
    ) -> Result<Self, Error> {
        match File::open(path) {
            // If the file exists we either want to overwrite the file or read from the file
            // based on the configuration provided
            Ok(_) => Ok(Gemla {
                data: if config.overwrite {
                    FileLinked::new((None, config, T::Context::default()), path, data_format)
                        .await?
                } else {
                    FileLinked::from_file(path, data_format)?
                },
                threads: HashMap::new(),
            }),
            // If the file doesn't exist we must create it
            Err(error) if error.kind() == ErrorKind::NotFound => Ok(Gemla {
                data: FileLinked::new((None, config, T::Context::default()), path, data_format)
                    .await?,
                threads: HashMap::new(),
            }),
            Err(error) => Err(Error::IO(error)),
        }
    }

    /// Returns a read-only reference to the simulation tree, configuration, and context.
    pub fn tree_ref(&self) -> Arc<RwLock<(Option<SimulationTree<T>>, GemlaConfig, T::Context)>> {
        self.data.readonly().clone()
    }

    /// Simulates the genetic algorithm for the specified number of steps, processing and evolving the population.
    ///
    /// # Arguments
    /// - `steps`: The number of simulation steps to perform. A simulation step increases the height of the tree by one and continues processing until all nodes are completed.
    ///
    /// # Returns
    /// - `Ok(())`: Simulation completed successfully.
    /// - `Err(Error)`: An error occurred during simulation.
    pub async fn simulate(&mut self, steps: u64) -> Result<(), Error> {
        let tree_completed = {
            // Only increase height if the tree is uninitialized or completed
            let data_arc = self.data.readonly();
            let data_ref = data_arc.read().await;
            let tree_ref = data_ref.0.as_ref();

            tree_ref.is_none() || tree_ref.map(|t| Gemla::is_completed(t)).unwrap_or(true)
        };

        if tree_completed {
            // Before we can process nodes we must create blank nodes in their place to keep track of which nodes have been processed
            // in the tree and which nodes have not.
            self.data
                .mutate(|(d, _, _)| {
                    let mut tree: Option<SimulationTree<T>> =
                        Gemla::increase_height(d.take(), steps);
                    mem::swap(d, &mut tree);
                })
                .await?;
        }

        {
            // Only increase height if the tree is uninitialized or completed
            let data_arc = self.data.readonly();
            let data_ref = data_arc.read().await;
            let tree_ref = data_ref.0.as_ref();

            info!(
                "Height of simulation tree increased to {}",
                tree_ref
                    .map(|t| format!("{}", t.height()))
                    .unwrap_or_else(|| "Tree is not defined".to_string())
            );
        }

        loop {
            let is_tree_processed;

            {
                let data_arc = self.data.readonly();
                let data_ref = data_arc.read().await;
                let tree_ref = data_ref.0.as_ref();

                is_tree_processed = tree_ref.map(|t| Gemla::is_completed(t)).unwrap_or(false)
            }

            // We need to keep simulating until the tree has been completely processed.
            if is_tree_processed {
                self.join_threads().await?;

                info!("Processed tree");
                break;
            }

            let (node, gemla_context) = {
                let data_arc = self.data.readonly();
                let data_ref = data_arc.read().await;
                let (tree_ref, _, gemla_context) = &*data_ref; // (Option<Box<Tree<GeneticNodeWrapper<T>>>, GemlaConfig, T::Context)

                let node = tree_ref.as_ref().and_then(|t| self.get_unprocessed_node(t));

                (node, gemla_context.clone())
            };

            if let Some(node) = node {
                trace!("Adding node to process list {}", node.id());

                let gemla_context = gemla_context.clone();

                self.threads.insert(
                    node.id(),
                    tokio::spawn(async move { Gemla::process_node(node, gemla_context).await }),
                );
            } else {
                trace!("No node found to process, joining threads");

                self.join_threads().await?;
            }
        }

        Ok(())
    }

    async fn join_threads(&mut self) -> Result<(), Error> {
        if !self.threads.is_empty() {
            trace!("Joining threads for nodes {:?}", self.threads.keys());

            let results = future::join_all(self.threads.values_mut()).await;

            // Converting a list of results into a result wrapping the list
            let reduced_results: Result<Vec<GeneticNodeWrapper<T>>, Error> =
                results.into_iter().flatten().collect();
            self.threads.clear();

            // We need to retrieve the processed nodes from the resulting list and replace them in the original list
            match reduced_results {
                Ok(r) => {
                    self.data
                        .mutate_async(|d| async move {
                            // Scope to limit the duration of the read lock
                            let (_, context) = {
                                let data_read = d.read().await;
                                (data_read.1, data_read.2.clone())
                            }; // Read lock is dropped here

                            let mut data_write = d.write().await;

                            if let Some(t) = data_write.0.as_mut() {
                                let failed_nodes = Gemla::replace_nodes(t, r);
                                // We receive a list of nodes that were unable to be found in the original tree
                                if !failed_nodes.is_empty() {
                                    warn!(
                                        "Unable to find {:?} to replace in tree",
                                        failed_nodes.iter().map(|n| n.id())
                                    )
                                }

                                // Once the nodes are replaced we need to find nodes that can be merged from the completed children nodes
                                Gemla::merge_completed_nodes(t, context.clone()).await
                            } else {
                                warn!("Unable to replce nodes {:?} in empty tree", r);
                                Ok(())
                            }
                        })
                        .await??;
                }
                Err(e) => return Err(e),
            }
        }

        Ok(())
    }

    #[async_recursion]
    async fn merge_completed_nodes<'a>(
        tree: &'a mut SimulationTree<T>,
        gemla_context: T::Context,
    ) -> Result<(), Error> {
        if tree.val.state() == GeneticState::Initialize {
            match (&mut tree.left, &mut tree.right) {
                // If the current node has been initialized, and has children nodes that are completed, then we need
                // to merge the children nodes together into the parent node
                (Some(l), Some(r))
                    if l.val.state() == GeneticState::Finish
                        && r.val.state() == GeneticState::Finish =>
                {
                    info!("Merging nodes {} and {}", l.val.id(), r.val.id());
                    if let (Some(left_node), Some(right_node)) = (l.val.as_ref(), r.val.as_ref()) {
                        let merged_node = GeneticNode::merge(
                            left_node,
                            right_node,
                            &tree.val.id(),
                            gemla_context.clone(),
                        )
                        .await?;
                        tree.val = GeneticNodeWrapper::from(*merged_node, tree.val.id());
                    }
                }
                (Some(l), Some(r)) => {
                    Gemla::merge_completed_nodes(l, gemla_context.clone()).await?;
                    Gemla::merge_completed_nodes(r, gemla_context.clone()).await?;
                }
                // If there is only one child node that's completed then we want to copy it to the parent node
                (Some(l), None) if l.val.state() == GeneticState::Finish => {
                    trace!("Copying node {}", l.val.id());

                    if let Some(left_node) = l.val.as_ref() {
                        GeneticNodeWrapper::from(left_node.clone(), tree.val.id());
                    }
                }
                (Some(l), None) => Gemla::merge_completed_nodes(l, gemla_context.clone()).await?,
                (None, Some(r)) if r.val.state() == GeneticState::Finish => {
                    trace!("Copying node {}", r.val.id());

                    if let Some(right_node) = r.val.as_ref() {
                        tree.val = GeneticNodeWrapper::from(right_node.clone(), tree.val.id());
                    }
                }
                (None, Some(r)) => Gemla::merge_completed_nodes(r, gemla_context.clone()).await?,
                (_, _) => (),
            }
        }

        Ok(())
    }

    fn get_unprocessed_node(&self, tree: &SimulationTree<T>) -> Option<GeneticNodeWrapper<T>> {
        // If the current node has been processed or exists in the thread list then we want to stop recursing. Checking if it exists in the thread list
        // should be fine because we process the tree from bottom to top.
        if tree.val.state() != GeneticState::Finish && !self.threads.contains_key(&tree.val.id()) {
            match (&tree.left, &tree.right) {
                // If the children are finished we can start processing the currrent node. The current node should be merged from the children already
                // during join_threads.
                (Some(l), Some(r))
                    if l.val.state() == GeneticState::Finish
                        && r.val.state() == GeneticState::Finish =>
                {
                    Some(tree.val.clone())
                }
                (Some(l), Some(r)) => self
                    .get_unprocessed_node(l)
                    .or_else(|| self.get_unprocessed_node(r)),
                (Some(l), None) => self.get_unprocessed_node(l),
                (None, Some(r)) => self.get_unprocessed_node(r),
                (None, None) => Some(tree.val.clone()),
            }
        } else {
            None
        }
    }

    fn replace_nodes(
        tree: &mut SimulationTree<T>,
        mut nodes: Vec<GeneticNodeWrapper<T>>,
    ) -> Vec<GeneticNodeWrapper<T>> {
        // Replacing nodes as we recurse through the tree
        if let Some(i) = nodes.iter().position(|n| n.id() == tree.val.id()) {
            tree.val = nodes.remove(i);
        }

        match (&mut tree.left, &mut tree.right) {
            (Some(l), Some(r)) => Gemla::replace_nodes(r, Gemla::replace_nodes(l, nodes)),
            (Some(l), None) => Gemla::replace_nodes(l, nodes),
            (None, Some(r)) => Gemla::replace_nodes(r, nodes),
            _ => nodes,
        }
    }

    fn increase_height(tree: Option<SimulationTree<T>>, amount: u64) -> Option<SimulationTree<T>> {
        if amount == 0 {
            tree
        } else {
            let left_branch_height =
                tree.as_ref().map(|t| t.height() as u64).unwrap_or(0) + amount - 1;

            Some(Box::new(Tree::new(
                GeneticNodeWrapper::new(),
                Gemla::increase_height(tree, amount - 1),
                // The right branch height has to equal the left branches total height
                if left_branch_height > 0 {
                    Some(Box::new(btree!(GeneticNodeWrapper::new())))
                } else {
                    None
                },
            )))
        }
    }

    fn is_completed(tree: &SimulationTree<T>) -> bool {
        // If the current node is finished, then by convention the children should all be finished as well
        tree.val.state() == GeneticState::Finish
    }

    async fn process_node(
        mut node: GeneticNodeWrapper<T>,
        gemla_context: T::Context,
    ) -> Result<GeneticNodeWrapper<T>, Error> {
        let node_state_time = Instant::now();
        let node_state = node.state();

        node.process_node(gemla_context.clone()).await?;

        info!(
            "{:?} completed in {:?} for {}",
            node_state,
            node_state_time.elapsed(),
            node.id()
        );

        if node.state() == GeneticState::Finish {
            info!("Processed node {}", node.id());
        }

        Ok(node)
    }
}

#[cfg(test)]
mod tests {
    use crate::core::*;
    use async_trait::async_trait;
    use serde::{Deserialize, Serialize};
    use std::fs;
    use std::path::PathBuf;
    use tokio::runtime::Runtime;

    use self::genetic_node::GeneticNodeContext;

    struct CleanUp {
        path: PathBuf,
    }

    impl CleanUp {
        fn new(path: &Path) -> CleanUp {
            CleanUp {
                path: path.to_path_buf(),
            }
        }

        pub fn run<F: FnOnce(&Path) -> Result<(), Error>>(&self, op: F) -> Result<(), Error> {
            op(&self.path)
        }
    }

    impl Drop for CleanUp {
        fn drop(&mut self) {
            if self.path.exists() {
                fs::remove_file(&self.path).expect("Unable to remove file");
            }
        }
    }

    #[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
    struct TestState {
        pub score: f64,
        pub max_generations: u64,
    }

    #[async_trait]
    impl genetic_node::GeneticNode for TestState {
        type Context = ();

        async fn simulate(
            &mut self,
            context: GeneticNodeContext<Self::Context>,
        ) -> Result<bool, Error> {
            self.score += 1.0;
            Ok(context.generation < self.max_generations)
        }

        async fn mutate(
            &mut self,
            _context: GeneticNodeContext<Self::Context>,
        ) -> Result<(), Error> {
            Ok(())
        }

        async fn initialize(
            _context: GeneticNodeContext<Self::Context>,
        ) -> Result<Box<TestState>, Error> {
            Ok(Box::new(TestState {
                score: 0.0,
                max_generations: 10,
            }))
        }

        async fn merge(
            left: &TestState,
            right: &TestState,
            _id: &Uuid,
            _: Self::Context,
        ) -> Result<Box<TestState>, Error> {
            Ok(Box::new(if left.score > right.score {
                left.clone()
            } else {
                right.clone()
            }))
        }
    }

    #[tokio::test]
    async fn test_new() -> Result<(), Error> {
        let path = PathBuf::from("test_new_non_existing");
        // Use `spawn_blocking` to run synchronous code that needs to call async code internally.
        tokio::task::spawn_blocking(move || {
            let rt = Runtime::new().unwrap(); // Create a new Tokio runtime for the async block.
            CleanUp::new(&path).run(move |p| {
                rt.block_on(async {
                    assert!(!path.exists());

                    // Testing initial creation
                    let mut config = GemlaConfig { overwrite: true };
                    let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;

                    // Now we can use `.await` within the spawned blocking task.
                    gemla.simulate(2).await?;
                    let data = gemla.data.readonly();
                    let data_lock = data.read().await;
                    assert_eq!(data_lock.0.as_ref().unwrap().height(), 2);

                    drop(data_lock);
                    drop(gemla);
                    assert!(path.exists());

                    // Testing overwriting data
                    let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;

                    gemla.simulate(2).await?;
                    let data = gemla.data.readonly();
                    let data_lock = data.read().await;
                    assert_eq!(data_lock.0.as_ref().unwrap().height(), 2);

                    drop(data_lock);
                    drop(gemla);
                    assert!(path.exists());

                    // Testing not-overwriting data
                    config.overwrite = false;
                    let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;

                    gemla.simulate(2).await?;
                    let data = gemla.data.readonly();
                    let data_lock = data.read().await;
                    let tree = data_lock.0.as_ref().unwrap();
                    assert_eq!(tree.height(), 4);

                    drop(data_lock);
                    drop(gemla);
                    assert!(path.exists());

                    Ok(())
                })
            })
        })
        .await
        .unwrap()?; // Wait for the blocking task to complete, then handle the Result.

        Ok(())
    }

    #[tokio::test]
    async fn test_simulate() -> Result<(), Error> {
        let path = PathBuf::from("test_simulate");
        // Use `spawn_blocking` to run the synchronous closure that internally awaits async code.
        tokio::task::spawn_blocking(move || {
            let rt = Runtime::new().unwrap(); // Create a new Tokio runtime for the async block.
            CleanUp::new(&path).run(move |p| {
                rt.block_on(async {
                    // Testing initial creation
                    let config = GemlaConfig { overwrite: true };
                    let mut gemla = Gemla::<TestState>::new(&p, config, DataFormat::Json).await?;

                    // Now we can use `.await` within the spawned blocking task.
                    gemla.simulate(5).await?;
                    let data = gemla.data.readonly();
                    let data_lock = data.read().await;
                    let tree = data_lock.0.as_ref().unwrap();
                    assert_eq!(tree.height(), 5);
                    assert_eq!(tree.val.as_ref().unwrap().score, 50.0);

                    Ok(())
                })
            })
        })
        .await
        .unwrap()?; // Wait for the blocking task to complete, then handle the Result.

        Ok(())
    }
}