1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::net::SocketAddr;

use colorful::Colorful;
use tokio::{sync::Mutex, try_join};

use ockam::Context;
use ockam_api::nodes::models::services::{StartKafkaDirectRequest, StartServiceRequest};
use ockam_api::port_range::PortRange;
use ockam_core::api::Request;
use ockam_multiaddr::MultiAddr;

use crate::node::{get_node_name, NodeOpts};
use crate::service::start::start_service_impl;
use crate::terminal::OckamColor;
use crate::util::{process_nodes_multiaddr, Rpc};
use crate::{display_parse_logs, fmt_log, fmt_ok, CommandGlobalOpts};

pub struct ArgOpts {
    pub endpoint: String,
    pub kafka_entity: String,
    pub node_opts: NodeOpts,
    pub addr: String,
    pub bind_address: SocketAddr,
    pub brokers_port_range: PortRange,
    pub consumer_route: Option<MultiAddr>,
    pub bootstrap_server: SocketAddr,
}

pub async fn start(ctx: Context, (opts, args): (CommandGlobalOpts, ArgOpts)) -> miette::Result<()> {
    let ArgOpts {
        endpoint,
        kafka_entity,
        node_opts,
        addr,
        bind_address,
        brokers_port_range,
        consumer_route,
        bootstrap_server,
    } = args;

    opts.terminal
        .write_line(&fmt_log!("Creating {} service...\n", kafka_entity))?;

    display_parse_logs(&opts);

    let consumer_route = if let Some(consumer_route) = consumer_route {
        Some(process_nodes_multiaddr(&consumer_route, &opts.state)?)
    } else {
        None
    };

    let is_finished = Mutex::new(false);
    let send_req = async {
        let node_name = get_node_name(&opts.state, &node_opts.at_node);
        let mut rpc = Rpc::background(&ctx, &opts, &node_name).await?;

        let payload = StartKafkaDirectRequest::new(
            bind_address.to_owned(),
            bootstrap_server,
            brokers_port_range,
            consumer_route,
        );
        let payload = StartServiceRequest::new(payload, &addr);
        let req = Request::post(endpoint).body(payload);
        start_service_impl(&mut rpc, &kafka_entity, req).await?;

        *is_finished.lock().await = true;

        Ok::<_, crate::Error>(())
    };

    let msgs = vec![
        format!(
            "Building {} service {}",
            kafka_entity,
            &addr.to_string().color(OckamColor::PrimaryResource.color())
        ),
        format!(
            "Creating {} service at {}",
            kafka_entity,
            &bind_address
                .to_string()
                .color(OckamColor::PrimaryResource.color())
        ),
        format!(
            "Setting brokers port range to {}",
            &brokers_port_range
                .to_string()
                .color(OckamColor::PrimaryResource.color())
        ),
    ];
    let progress_output = opts.terminal.progress_output(&msgs, &is_finished);
    let (_, _) = try_join!(send_req, progress_output)?;

    opts.terminal
        .stdout()
        .plain(
            fmt_ok!(
                "{} service started at {}\n",
                kafka_entity,
                &bind_address
                    .to_string()
                    .color(OckamColor::PrimaryResource.color())
            ) + &fmt_log!(
                "Brokers port range set to {}",
                &brokers_port_range
                    .to_string()
                    .color(OckamColor::PrimaryResource.color())
            ),
        )
        .write_line()?;

    Ok(())
}