use super::layer::layers::{Layer, LayerTypes, InputTypes};
use super::layer::methods::errors::ErrorTypes;
use super::layer::methods::pair::GradientPair;
use super::data::matrix::Matrix;
use super::data::input::Input;
use super::serialize::ser_layer::SerializedLayer;
use rand::{RngCore, Rng, thread_rng};
use rand_pcg::Pcg64;
use rand_seeder::Seeder;
use serde::{Serialize, Deserialize};
use futures::stream::{StreamExt, FuturesUnordered};
use serde_json::{to_string, from_str};
use std::{io, fs};
use std::{
fs::File,
io::{Read,Write},
};
#[derive(Serialize, Deserialize)]
pub struct Network {
batch_size: usize,
pub layer_sizes: Vec<usize>,
pub loss: f32,
loss_train: Vec<f32>,
pub layers: Vec<Box<dyn Layer>>,
uncompiled_layers: Vec<LayerTypes>,
seed: Option<String>,
#[serde(skip)]
#[serde(default = "Network::thread_rng")]
rng: Box<dyn RngCore>,
log: bool
}
const ITERATIONS_PER_EPOCH: usize = 1000;
impl Network{
fn thread_rng() -> Box<dyn RngCore> {
Box::new(thread_rng())
}
pub fn set_log(&mut self, state: bool) {
self.log = state
}
pub fn new(batch_size: usize) -> Network{
Network{
batch_size,
layer_sizes: vec![],
loss: 1.0,
layers: vec![],
uncompiled_layers: vec![],
loss_train: vec![],
seed: None,
rng: Box::new(thread_rng()),
log: true
}
}
pub fn get_layer_loss(&self) -> Vec<(f32, f32)> {
let mut res: Vec<(f32, f32)> = vec![];
for i in 0..self.layers.len() - 1{
res.push(((i) as f32, self.layers[i].get_loss()));
}
res
}
pub fn get_loss_history(&self) -> Vec<f32> {
self.loss_train.clone()
}
pub fn add_layer(&mut self, layer: LayerTypes){
self.layer_sizes.push(layer.get_size());
self.uncompiled_layers.push(layer);
}
pub fn set_input(&mut self, input: InputTypes){
if self.layer_sizes.len() > 0 {
self.layer_sizes[0] = input.get_size();
self.uncompiled_layers[0] = input.to_layer();
} else {
self.layer_sizes.push(input.get_size());
self.uncompiled_layers.push(input.to_layer());
}
}
pub fn compile(&mut self){
for i in 1..self.uncompiled_layers.len() {
let new_layer = self.uncompiled_layers[i].to_layer(self.layer_sizes[i-1], &mut self.rng);
self.layers.push(new_layer);
}
}
pub fn predict(&mut self, input: &dyn Input) -> Vec<f32>{
let in_box: Box<dyn Input> = input.to_box();
self.feed_forward(&in_box)
}
pub fn set_seed(&mut self, seed: &str){
self.seed = Some(String::from(seed));
self.rng = self.get_rng();
}
async fn get_minibatch_gradient(&self, minibatch: &Vec<(Box<dyn Input>, Vec<f32>)>) -> (Vec<Box<dyn Input>>, Vec<Box<dyn Input>>) {
let len = minibatch.len();
let gradients = futures::stream::iter(minibatch)
.map(|input_output| self.feed_forward_async(&input_output.0, &input_output.1))
.buffer_unordered(len)
.map(|data_output| self.back_propegate_async(data_output.0, data_output.1))
.buffer_unordered(len)
.collect::<FuturesUnordered<_>>()
.await;
let (mut bias_gradients,mut weight_gradients) = (vec![], vec![]);
gradients.iter().for_each(|pair| {
let mut gradient_bias = vec![];
let mut gradient_weight = vec![];
pair.iter().for_each(|GradientPair(bias, weight)| {
gradient_bias.push(bias);
gradient_weight.push(weight);
});
bias_gradients.push(gradient_bias);
weight_gradients.push(gradient_weight);
});
let mut avg_weights_gradient:Vec<Box<dyn Input>> = vec![];
let mut avg_bias_gradient:Vec<Box<dyn Input>> = vec![];
for layer_gradient in 0..weight_gradients[0].len() {
avg_bias_gradient.push(self.layers[layer_gradient].avg_gradient(bias_gradients.iter().map(|grad| grad[layer_gradient]).collect::<Vec<_>>()));
avg_weights_gradient.push(self.layers[layer_gradient].avg_gradient(weight_gradients.iter().map(|grad| grad[layer_gradient]).collect::<Vec<_>>()));
}
(avg_bias_gradient, avg_weights_gradient)
}
fn feed_forward(&mut self, input_obj: &Box<dyn Input>) -> Vec<f32> {
if input_obj.to_param().shape() != self.layers[0].shape(){
panic!("Input shape does not match input layer shape \nInput: {:?}\nInput Layer:{:?}", input_obj.shape(), self.layers[0].shape());
}
let mut data_at: Box<dyn Input> = Box::new(input_obj.to_param());
for i in 0..self.layers.len(){
data_at = self.layers[i].forward(&data_at);
self.layers[i].set_data(&data_at);
}
data_at.to_param().to_owned()
}
async fn feed_forward_async(&self, input_obj: &Box<dyn Input>, output: &Vec<f32>) -> (Vec<Box<dyn Input>>, Vec<f32>) {
if input_obj.to_param().shape() != self.layers[0].shape(){
panic!("Input shape does not match input layer shape \nInput: {:?}\nInput Layer:{:?}", input_obj.shape(), self.layers[0].shape());
}
let mut data_at: Box<dyn Input> = Box::new(input_obj.to_param());
let mut res: Vec<Box<dyn Input>> = vec![data_at.to_box()];
for i in 0..self.layers.len(){
data_at = self.layers[i].forward(&data_at);
res.push(data_at.to_box())
}
(res, output.clone())
}
fn back_propegate(&mut self, outputs: &Vec<f32>, target_obj: &Box<dyn Input>, loss: &ErrorTypes) {
let mut gradients: Box<dyn Input>;
let actual: Box<dyn Input> = Box::new(outputs.clone());
let mut errors: Box<dyn Input> = loss.get_error(&actual, target_obj, 1);
for i in (1..self.layers.len()).rev() {
gradients = self.layers[i].update_gradient();
let data_box: Box<dyn Input> = self.layers[i - 1].get_data();
errors = self.layers[i].backward(gradients, errors, data_box);
}
}
async fn back_propegate_async(&self, data: Vec<Box<dyn Input>>, output: Vec<f32>) -> Vec<GradientPair> {
let mut res = vec![];
let parsed = Matrix::from(data[data.len()-1].to_param_2d());
if let None = self.layers[self.layers.len()-1].get_activation() {
panic!("Output layer is not a dense layer");
}
let mut errors: Box<dyn Input> = Box::new(Matrix::from(output.to_param_2d()).transpose() - &parsed);
for i in (0..self.layers.len()).rev() {
res.push(self.layers[i].get_gradients(&data[i + 1], &data[i], &errors));
errors = self.layers[i].update_errors(errors);
}
res.reverse();
res
}
pub fn fit(&mut self, train_in: &Vec<&dyn Input>, train_out: &Vec<Vec<f32>>, epochs: usize, error_fn: ErrorTypes) {
self.loss_train = vec![];
let mut loss: f32;
let num_batches = train_in.len() / self.batch_size;
let mut iterations_per_epoch: usize = 40;
if train_in.len() < ITERATIONS_PER_EPOCH {
let iteration_scale_factor = ITERATIONS_PER_EPOCH / train_in.len();
iterations_per_epoch = (iteration_scale_factor as f32 * 40.0).ceil() as usize;
}
if self.log {
println!("{}", iterations_per_epoch);
}
let iterations_divided_even = iterations_per_epoch / 40;
for epoch in 0..epochs {
let _ = io::stdout().flush();
if self.log {
print!("Epoch {}: [", epoch+1);
}
loss = 0.0;
for iteration in 0..iterations_per_epoch {
if iteration % iterations_divided_even == 0 && self.log {
let _ = io::stdout().flush();
print!("=");
}
for batch_index in 0..num_batches {
let start = batch_index * self.batch_size;
let end = start + self.batch_size;
let end = end.min(train_in.len());
let mut batch_loss: f32 = 0.0;
for input_index in start..end {
let mut loss_on_input: f32 = 0.0;
let input: Box<dyn Input> = train_in[input_index].to_box();
let output: Box<dyn Input> = Box::new(train_out[input_index].clone());
let outputs = self.feed_forward(&input);
self.back_propegate(&outputs, &output, &error_fn);
for i in 0..outputs.len() {
loss_on_input += (outputs[i] - train_out[input_index].to_param()[i]).powi(2);
}
batch_loss += loss_on_input / outputs.len() as f32;
}
loss += batch_loss / self.batch_size as f32;
}
}
self.loss_train.push(loss / (iterations_per_epoch * num_batches) as f32);
if self.log {
println!("] Loss: {}", self.loss_train[self.loss_train.len()-1]);
}
}
self.loss = self.loss_train[self.loss_train.len() - 1];
if self.log {
println!("Trained to a loss of {:.2}%", self.loss * 100.0);
for i in 0..self.layers.len() - 1 {
println!("Error on layer {}: +/- {:.2}", i + 1, self.layers[i].get_loss());
}
}
}
fn update_gradients(&mut self, gradient_pairs: &(Vec<Box<dyn Input>>, Vec<Box<dyn Input>>)) { if gradient_pairs.0.len() != self.layers.len() {
panic!("Gradients length not equal to number of layers:
\nGradients: {}\nLayers: {}",
gradient_pairs.0.len(),
self.layers.len());
}
for i in 0..self.layers.len() {
self.layers[i].update_gradients((&gradient_pairs.0[i], &gradient_pairs.1[i]), None); }
}
pub async fn fit_minibatch(&mut self, train_in: &Vec<&dyn Input>, train_out: &Vec<Vec<f32>>, epochs: usize) {
let _ = io::stdout().flush();
print!("[");
for _ in 1..=epochs {
let minibatches: Vec<Vec<(Box<dyn Input>, Vec<f32>)>> =
self.generate_minibatches(train_in.clone(), train_out.clone());
let len = minibatches.len();
let _ = io::stdout().flush();
print!("#");
let all_gradients = futures::stream::iter(&minibatches)
.map(|batch| self.get_minibatch_gradient(batch))
.buffer_unordered(len)
.collect::<FuturesUnordered<_>>();
let res = all_gradients.await;
for gradient_pair in res.iter() {
self.update_gradients(&gradient_pair);
}
}
println!("]");
}
fn generate_minibatches(&self,mut inputs: Vec<&dyn Input>,mut outputs: Vec<Vec<f32>>) -> Vec<Vec<(Box<dyn Input>, Vec<f32>)>> {
let mut res = vec![];
let mut rng = self.get_rng();
let mut minibatch: Vec<(Box<dyn Input>, Vec<f32>)>;
let mut iterations: usize;
while inputs.len() > 0 {
minibatch = vec![];
iterations = inputs.len().min(self.batch_size);
for _ in 0..iterations {
let location = rng.gen_range(0..inputs.len());
minibatch.push((inputs[location].to_box(), outputs[location].clone()));
inputs.remove(location);
outputs.remove(location);
}
res.push(minibatch);
}
res
}
fn get_rng(&self) -> Box<dyn RngCore> {
match &self.seed {
Some(seed_rng) => Box::new(Seeder::from(seed_rng).make_rng::<Pcg64>()),
None => Box::new(rand::thread_rng())
}
}
pub fn save(&self, path: &str) {
let mut file = File::create(path).expect("Unable to hit save file :(");
let file_ser = to_string(self).expect("Unable to serialize network :(((");
file.write_all(file_ser.to_string().as_bytes()).expect("Write failed :(");
}
pub fn save_cbor(&self, path: &str) {
let res_file = File::create(path).expect("Unable to save file");
serde_cbor::to_writer(res_file, self).expect("Unable to write or compile cbor");
}
pub fn load(path: &str) -> Network{
let mut buffer = String::new();
let mut file = File::open(path).expect("Unable to read file :(");
file.read_to_string(&mut buffer).expect("Unable to read file but even sadder :(");
let mut net: Network = from_str(&buffer).expect("Json was not formatted well >:(");
net.rng = net.get_rng();
net
}
pub fn load_cbor(path: &str) -> Result<Network, serde_cbor::Error> {
let file = File::open(path).expect("error loading file");
let mut network: Network = serde_cbor::from_reader(file)?;
network.rng = network.get_rng();
Ok(network)
}
pub fn to_vec(&self) -> Result<Vec<u8>, serde_cbor::Error> {
serde_cbor::to_vec(self)
}
pub fn from_vec(data: Vec<u8>) -> Result<Network, serde_cbor::Error> {
serde_cbor::from_slice(&data[..])
}
pub fn serialize_unda_fmt(&self, path: &str) {
let mut str_fmt: Vec<String> = vec![];
for i in 0..self.layers.len() {
let layer_serialized: SerializedLayer = SerializedLayer::new(&self.layers[i], &self.uncompiled_layers[i]);
str_fmt.push(layer_serialized.to_string());
}
fs::write(path, str_fmt.join("#")).expect("Error writing to file");
}
pub fn deserialize_unda_fmt_string(format_string: String) -> Network {
let mut net: Network = Network::new(0);
let parse_triton = format_string.split("#");
for layer in parse_triton {
let new_layer: Box<dyn Layer> = SerializedLayer::from_string(layer.to_string()).from();
net.layers.push(new_layer);
}
net
}
}
#[cfg(test)]
mod test {
use crate::core::layer::layers::InputTypes;
use super::Network;
#[test]
fn check_set_input() {
let mut net: Network = Network::new(10);
net.set_input(InputTypes::DENSE(10));
net.set_input(InputTypes::DENSE(20));
assert_eq!(net.layer_sizes[0], 20);
}
}