use std::{
    borrow::BorrowMut,
    collections::HashMap,
    sync::Arc,
    time::Duration,
    time::Instant
};
use futures::{
    future::Future,
    Stream,
    sync::mpsc::{
        channel, Receiver as MpscReceiver, Sender as MpscSender, unbounded,
        UnboundedReceiver, UnboundedSender
    }
};
use parking_lot::{Mutex, RwLock};
use tokio::runtime::current_thread;
use tokio::timer::Delay;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use spectacles_model::gateway::{GatewayBot, GatewayEvent, Opcodes};
use crate::{
    constants::API_BASE,
    errors::Error,
    queue::{MessageSink, MessageSinkError, ReconnectQueue, ShardQueue},
    shard::{Shard, ShardAction}
};
use crate::EventHandler;
use crate::ManagerOptions;
#[derive(Clone)]
pub enum ShardStrategy {
    
    Recommended,
    
    SpawnAmount(usize)
}
pub struct ShardMessage {
}
pub type ShardMap = HashMap<usize, Arc<Mutex<Shard>>>;
pub type MessageStream = UnboundedReceiver<(Arc<Mutex<Shard>>, TungsteniteMessage)>;
pub struct ShardManager<H: EventHandler + Send + Sync + 'static> {
    
    pub token: String,
    
    pub options: Option<ManagerOptions<H>>,
    
    pub shards: Arc<RwLock<ShardMap>>,
    _spawn_amount: usize,
    queue_sender: MpscSender<usize>,
    queue_receiver: Option<MpscReceiver<usize>>,
    reconnect_queue: ShardQueue,
    message_stream: Option<MessageStream>,
}
impl <H: EventHandler + Send + Sync + 'static> ShardManager<H> {
    
    pub fn new(token: String, options: ManagerOptions<H>) -> impl Future<Item = ShardManager<H>, Error = Error> {
        let token = if token.starts_with("Bot ") {
            token
        } else {
            format!("Bot {}", token)
        };
        let (queue_sender, queue_receiver) = channel(0);
        use reqwest::r#async::Client;
        Client::new().get(format!("{}/gateway/bot", API_BASE).as_str())
            .header("Authorization", token.clone()).send()
            .and_then(|mut resp| resp.json::<GatewayBot>())
            .map_err(Error::from)
            .map(|gb| {
                let shard_count = match options.strategy {
                    ShardStrategy::Recommended => gb.shards,
                    ShardStrategy::SpawnAmount(int) => int
                };
                Self {
                    token,
                    reconnect_queue: ShardQueue::new(shard_count),
                    queue_sender,
                    options: Some(options),
                    queue_receiver: Some(queue_receiver),
                    message_stream: None,
                    _spawn_amount: shard_count,
                    shards: Arc::new(RwLock::new(HashMap::new())),
                }
        })
    }
    
    pub fn begin_spawn(mut self) {
        info!("Bootstrapping ShardManager.");
        let shard_count = self._spawn_amount.clone();
        debug!("Attempting to spawn {} shards.", &shard_count);
        for i in 0..shard_count {
            trace!("Sending shard {} to queue.", &i);
            tokio::spawn(self.reconnect_queue.push_back(i)
                .map_err(move |_| error!("Failed to place Shard {} into reconnect queue.", i))
            );
        }
        let (sender, receiver) = unbounded();
        self.message_stream = Some(receiver);
        let mut queue_sender = self.queue_sender.clone();
        let receiver = self.queue_receiver.take().unwrap();
        let token = self.token.clone();
        let newsender = sender.clone();
        let shards = self.shards.clone();
        let message_stream = self.message_stream.take().unwrap();
        let opts = self.options.take().unwrap();
        current_thread::spawn(self.reconnect_queue.pop_front()
            .and_then(move |shard| {
                let shard = shard.expect("Shard queue is empty.");
                queue_sender.try_send(shard).expect("Failed to send starting shard.");
                futures::future::ok(())
            })
            .map_err(|_| error!("Failed to pop shard in reconnect queue."))
            .and_then(move |_| receive_chan(
                receiver,
                token,
                shard_count,
                newsender,
                shards,
            ))
        );
        tokio::spawn(message_stream.for_each(move |(mut shard, message)| {
            let current_shard = shard.borrow_mut();
            let mut shard = current_shard.lock().clone();
            trace!("Websocket message received: {:?}", &message.clone());
            let event = shard.resolve_packet(&message).expect("Failed to parse the shard message.");
            if let Opcodes::Dispatch = event.op {
                tokio::spawn({
                    opts.handler.on_packet(&mut shard, event.clone());
                    futures::future::ok(())
                });
            }
            let action = shard.fulfill_gateway(event.clone());
            match action {
                Ok(a) => match a {
                    ShardAction::Autoreconnect => {
                        current_thread::spawn(shard.autoreconnect().map_err({
                            let shard = shard.info[0].clone();
                            move |err| {
                                error!("Shard {} failed to autoreconnect. {}", shard, err);
                            }
                        }));
                    },
                    ShardAction::Identify => {
                        debug!("[Shard {}] Identifying with the gateway.", &shard.info[0]);
                        if let Err(e) = shard.identify() {
                            warn!("[Shard {}] Failed to identify with gateway. {:?}", &shard.info[0], e);
                        };
                    },
                    ShardAction::Reconnect => {
                        shard.reconnect();
                        info!("[Shard {}] Reconnection successful.", &shard.info[0]);
                    },
                    ShardAction::Resume => {
                        shard.resume();
                        info!("[Shard {}] Successfully resumed session.", &shard.info[0]);
                    },
                    _ => {}
                },
                Err(e) => {
                    error!("Failed to perform action for Shard {}. {}", &shard.info[0], e);
                },
            };
            if let Some(GatewayEvent::READY) = event.t {
                &(self).queue();
                tokio::spawn({
                    opts.handler.on_shard_ready(&mut shard);
                    futures::future::ok(())
                });
            };
            futures::future::ok(())
        }));
    }
    fn queue(&mut self) {
        current_thread::spawn({
            self.reconnect_queue.pop_front()
                .and_then({
                    let mut sender = self.queue_sender.clone();
                    move |id| {
                        if let Some(next) = id {
                            if let Err(e) = sender.try_send(next) {
                                error!("Could not place shard ID in queue. {:?}", e);
                            }
                        }
                        futures::future::ok(())
                    }
                })
        });
    }
    
}
fn receive_chan(
    receiver: MpscReceiver<usize>,
    token: String,
    shardcount: usize,
    sender: UnboundedSender<(Arc<Mutex<Shard>>, TungsteniteMessage)>,
    shardmap: Arc<RwLock<ShardMap>>
) -> impl Future<Item = (), Error = ()> {
    receiver.for_each(move |shard_id| {
        debug!("Received notification to shart Shard {}.", shard_id);
        let shardmap = shardmap.clone();
        let token = token.clone();
        let sender = sender.clone();
        Delay::new(Instant::now() + Duration::from_secs(5))
            .map_err(|err| error!("Failed to pause before spawning the next shard. {:?}", err))
            .and_then(move |_| {
                current_thread::spawn(create_shard(token, [shard_id, shardcount], sender)
                    .map(move |shard| {
                        shardmap.write().insert(shard_id, shard);
                        info!("Shard {} has been successfully spawned.", shard_id);
                    })
                );
                futures::future::ok(())
            })
    }).map_err(|_| ())
}
fn create_shard(
    token: String,
    info: [usize; 2],
    sender: UnboundedSender<(Arc<Mutex<Shard>>, TungsteniteMessage)>,
) -> impl Future<Item = Arc<Mutex<Shard>>, Error = ()> {
    Shard::new(token.clone(), info)
        .map_err(move |err| error!("Failed to start Shard {}. {:?}", info[0], err))
        .and_then(|res| {
            let shard = Arc::new(Mutex::new(res));
            let sink = MessageSink {
                shard: shard.clone(),
                sender,
            };
            tokio::spawn(Box::new(shard.lock()
                .stream.lock().take()
                .unwrap()
                .map_err(MessageSinkError::from)
                .forward(sink)
                .map(|_| ())
                .map_err(|e| error!("Failed to forward shard messages to the sink. {:?}", e)))
            );
            futures::future::ok(shard)
        })
}