1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::sync::{Arc, Weak};

use anyhow::Result;
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};

#[derive(Default)]
pub struct Deployment {
    pub hosts: Vec<Arc<RwLock<dyn Host>>>,
    pub services: Vec<Weak<RwLock<dyn Service>>>,
    pub resource_pool: ResourcePool,
    last_resource_result: Option<Arc<ResourceResult>>,
    next_host_id: usize,
    next_service_id: usize,
}

impl Deployment {
    pub async fn deploy(&mut self) -> Result<()> {
        progress::ProgressTracker::with_group("deploy", || async {
            let mut resource_batch = super::ResourceBatch::new();
            let active_services = self
                .services
                .iter()
                .filter(|service| service.upgrade().is_some())
                .cloned()
                .collect::<Vec<_>>();
            self.services = active_services;

            for service in self.services.iter_mut() {
                service
                    .upgrade()
                    .unwrap()
                    .write()
                    .await
                    .collect_resources(&mut resource_batch);
            }

            for host in self.hosts.iter_mut() {
                host.write().await.collect_resources(&mut resource_batch);
            }

            let result = Arc::new(
                progress::ProgressTracker::with_group("provision", || async {
                    resource_batch
                        .provision(&mut self.resource_pool, self.last_resource_result.clone())
                        .await
                })
                .await?,
            );
            self.last_resource_result = Some(result.clone());

            progress::ProgressTracker::with_group("provision", || {
                let hosts_provisioned =
                    self.hosts
                        .iter_mut()
                        .map(|host: &mut Arc<RwLock<dyn Host>>| async {
                            host.write().await.provision(&result).await;
                        });
                futures::future::join_all(hosts_provisioned)
            })
            .await;

            progress::ProgressTracker::with_group("deploy", || {
                let services_future =
                    self.services
                        .iter_mut()
                        .map(|service: &mut Weak<RwLock<dyn Service>>| async {
                            service
                                .upgrade()
                                .unwrap()
                                .write()
                                .await
                                .deploy(&result)
                                .await;
                        });

                futures::future::join_all(services_future)
            })
            .await;

            progress::ProgressTracker::with_group("ready", || {
                let all_services_ready =
                    self.services
                        .iter()
                        .map(|service: &Weak<RwLock<dyn Service>>| async {
                            service.upgrade().unwrap().write().await.ready().await?;
                            Ok(()) as Result<()>
                        });

                futures::future::try_join_all(all_services_ready)
            })
            .await?;

            Ok(())
        })
        .await
    }

    pub async fn start(&mut self) {
        let active_services = self
            .services
            .iter()
            .filter(|service| service.upgrade().is_some())
            .cloned()
            .collect::<Vec<_>>();
        self.services = active_services;

        let all_services_start =
            self.services
                .iter()
                .map(|service: &Weak<RwLock<dyn Service>>| async {
                    service.upgrade().unwrap().write().await.start().await;
                });

        futures::future::join_all(all_services_start).await;
    }

    pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
        &mut self,
        host: F,
    ) -> Arc<RwLock<T>> {
        let arc = Arc::new(RwLock::new(host(self.next_host_id)));
        self.next_host_id += 1;

        self.hosts.push(arc.clone());
        arc
    }

    pub fn add_service<T: Service + 'static>(
        &mut self,
        service: impl FnOnce(usize) -> T,
    ) -> Arc<RwLock<T>> {
        let arc = Arc::new(RwLock::new(service(self.next_service_id)));
        self.next_service_id += 1;

        let dyn_arc: Arc<RwLock<dyn Service>> = arc.clone();
        self.services.push(Arc::downgrade(&dyn_arc));
        arc
    }
}