use anyhow::{bail, Context, Result};
use comfy_table::{
modifiers::UTF8_ROUND_CORNERS, presets::UTF8_FULL, Attribute, Cell, Color, Table,
};
use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, info, warn};
use crate::build::{self, BuildOptions};
use crate::config::Config;
pub use crate::api::models::{Deployment, DeploymentStatus};
pub(super) fn parse_duration(s: &str) -> Result<Duration> {
let s = s.trim();
if s.is_empty() {
bail!("Duration string is empty");
}
let (num_str, unit) = if let Some(num_str) = s.strip_suffix("ms") {
(num_str, "ms")
} else {
let num_str = &s[..s.len() - 1];
let unit = &s[s.len() - 1..];
(num_str, unit)
};
let num: u64 = num_str.parse().context("Invalid duration number")?;
let duration = match unit {
"ms" => Duration::from_millis(num),
"s" => Duration::from_secs(num),
"m" => Duration::from_secs(num * 60),
"h" => Duration::from_secs(num * 3600),
_ => bail!("Invalid duration unit '{}'. Use ms, s, m, or h", unit),
};
Ok(duration)
}
pub async fn fetch_deployment(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
deployment_id: &str,
) -> Result<Deployment> {
let url = format!(
"{}/api/v1/projects/{}/deployments/{}",
backend_url, project, deployment_id
);
let response = http_client
.get(&url)
.bearer_auth(token)
.send()
.await
.context("Failed to fetch deployment")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
bail!("Failed to fetch deployment ({}): {}", status, error_text);
}
let deployment: Deployment = response
.json()
.await
.context("Failed to parse deployment response")?;
Ok(deployment)
}
pub async fn list_deployments(
http_client: &Client,
backend_url: &str,
config: &Config,
project: &str,
group: Option<&str>,
limit: usize,
) -> Result<()> {
let token = config
.get_token()
.ok_or_else(|| anyhow::anyhow!("Not logged in. Please run 'rise login' first."))?;
if let Some(g) = group {
info!(
"Listing deployments for project '{}' in group '{}'",
project, g
);
} else {
info!("Listing deployments for project '{}'", project);
}
let mut url = format!("{}/api/v1/projects/{}/deployments", backend_url, project);
if let Some(g) = group {
url = format!("{}?group={}", url, urlencoding::encode(g));
}
let response = http_client
.get(&url)
.bearer_auth(token)
.send()
.await
.context("Failed to list deployments")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
bail!("Failed to list deployments ({}): {}", status, error_text);
}
let mut deployments: Vec<Deployment> = response
.json()
.await
.context("Failed to parse deployments")?;
deployments.truncate(limit);
if deployments.is_empty() {
println!("No deployments found for project '{}'", project);
return Ok(());
}
let mut active_per_group = std::collections::HashMap::new();
for deployment in &deployments {
if deployment.status == DeploymentStatus::Healthy {
active_per_group.insert(
deployment.deployment_group.clone(),
deployment.deployment_id.clone(),
);
}
}
let default_active = active_per_group.get("default");
let mut table = Table::new();
table
.load_preset(UTF8_FULL)
.apply_modifier(UTF8_ROUND_CORNERS)
.set_header(vec![
Cell::new("DEPLOYMENT").add_attribute(Attribute::Bold),
Cell::new("STATUS").add_attribute(Attribute::Bold),
Cell::new("CREATED BY").add_attribute(Attribute::Bold),
Cell::new("IMAGE").add_attribute(Attribute::Bold),
Cell::new("GROUP").add_attribute(Attribute::Bold),
Cell::new("EXPIRY").add_attribute(Attribute::Bold),
Cell::new("CREATED").add_attribute(Attribute::Bold),
Cell::new("URL").add_attribute(Attribute::Bold),
Cell::new("ERROR").add_attribute(Attribute::Bold),
]);
for deployment in deployments {
let deployment_display = deployment.deployment_id.clone();
let url = if deployment.status == DeploymentStatus::Healthy {
deployment.primary_url.as_deref().unwrap_or("-")
} else {
"-"
};
let created = if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&deployment.created) {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
deployment.created.clone()
};
let expiry = if let Some(expires_at) = &deployment.expires_at {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(expires_at) {
dt.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
expires_at.to_string()
}
} else {
"-".to_string()
};
let is_active_in_group =
active_per_group.get(&deployment.deployment_group) == Some(&deployment.deployment_id);
let is_default_active = default_active == Some(&deployment.deployment_id);
let image_display = deployment.image.as_deref().unwrap_or("-");
let mut deployment_cell = Cell::new(&deployment_display);
let mut status_cell = Cell::new(deployment.status.to_string());
let mut created_by_cell = Cell::new(&deployment.created_by_email);
let mut image_cell = Cell::new(image_display);
let mut group_cell = Cell::new(&deployment.deployment_group);
let mut expiry_cell = Cell::new(&expiry);
let mut created_cell = Cell::new(&created);
let mut url_cell = Cell::new(url);
if is_default_active {
deployment_cell = deployment_cell.add_attribute(Attribute::Bold);
status_cell = status_cell.add_attribute(Attribute::Bold);
created_by_cell = created_by_cell.add_attribute(Attribute::Bold);
image_cell = image_cell.add_attribute(Attribute::Bold);
group_cell = group_cell.add_attribute(Attribute::Bold);
expiry_cell = expiry_cell.add_attribute(Attribute::Bold);
created_cell = created_cell.add_attribute(Attribute::Bold);
url_cell = url_cell.add_attribute(Attribute::Bold);
}
if is_active_in_group {
deployment_cell = deployment_cell.fg(Color::Green);
status_cell = status_cell.fg(Color::Green);
}
let error_cell = if let Some(ref error) = deployment.error_message {
let truncated: String = if error.len() > 40 {
format!("{}...", &error[..37])
} else {
error.clone()
};
Cell::new(truncated).fg(Color::Red)
} else {
Cell::new("-")
};
table.add_row(vec![
deployment_cell,
status_cell,
created_by_cell,
image_cell,
group_cell,
expiry_cell,
created_cell,
url_cell,
error_cell,
]);
}
println!("{}", table);
Ok(())
}
pub async fn show_deployment(
http_client: &Client,
backend_url: &str,
config: &Config,
project: &str,
deployment_id: &str,
follow: bool,
timeout_str: &str,
) -> Result<()> {
if follow {
let deployment = super::follow_ui::follow_deployment_with_ui(
http_client,
backend_url,
config,
project,
deployment_id,
timeout_str,
)
.await?;
if deployment.status == DeploymentStatus::Failed {
if let Some(error) = deployment.error_message {
bail!("Deployment failed: {}", error);
} else {
bail!("Deployment failed");
}
}
Ok(())
} else {
let token = config
.token
.as_ref()
.context("Not logged in. Please run 'rise login' first.")?;
let deployment =
fetch_deployment(http_client, backend_url, token, project, deployment_id).await?;
super::follow_ui::print_deployment_snapshot(&deployment);
if deployment.status == DeploymentStatus::Failed {
if let Some(error) = deployment.error_message {
bail!("Deployment failed: {}", error);
} else {
bail!("Deployment failed");
}
}
Ok(())
}
}
#[derive(Debug, Deserialize)]
struct StopDeploymentsResponse {
stopped_count: usize,
deployment_ids: Vec<String>,
}
pub async fn stop_deployments_by_group(
http_client: &Client,
backend_url: &str,
config: &Config,
project: &str,
group: &str,
) -> Result<()> {
let token = config
.get_token()
.ok_or_else(|| anyhow::anyhow!("Not logged in. Please run 'rise login' first."))?;
info!(
"Stopping deployments in group '{}' for project '{}'",
group, project
);
let url = format!(
"{}/api/v1/projects/{}/deployments/stop?group={}",
backend_url,
project,
urlencoding::encode(group)
);
let response = http_client
.post(&url)
.bearer_auth(token)
.send()
.await
.context("Failed to stop deployments")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
bail!("Failed to stop deployments ({}): {}", status, error_text);
}
let stop_response: StopDeploymentsResponse = response
.json()
.await
.context("Failed to parse stop response")?;
if stop_response.stopped_count == 0 {
println!("No running deployments found in group '{}'", group);
} else {
println!(
"✓ Stopped {} deployment(s) in group '{}':",
stop_response.stopped_count, group
);
for deployment_id in &stop_response.deployment_ids {
println!(" - {}", deployment_id);
}
}
Ok(())
}
#[derive(Debug, Deserialize)]
struct RegistryCredentials {
registry_url: String,
username: String,
password: String,
#[allow(dead_code)]
expires_in: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct CreateDeploymentResponse {
deployment_id: String,
image_tag: String,
credentials: RegistryCredentials,
}
pub struct DeploymentOptions<'a> {
pub project_name: &'a str,
pub path: &'a str,
pub image: Option<&'a str>,
pub group: Option<&'a str>,
pub expires_in: Option<&'a str>,
pub http_port: Option<u16>,
pub build_args: &'a build::BuildArgs,
pub from_deployment: Option<&'a str>,
pub use_source_env_vars: bool,
pub push_image: bool,
}
pub async fn create_deployment(
http_client: &Client,
backend_url: &str,
config: &Config,
deploy_opts: DeploymentOptions<'_>,
) -> Result<()> {
if let Some(from_deployment_id) = deploy_opts.from_deployment {
info!(
"Creating deployment for project '{}' from deployment '{}' with {} environment variables",
deploy_opts.project_name,
from_deployment_id,
if deploy_opts.use_source_env_vars { "source" } else { "current project" }
);
} else if let Some(image_ref) = deploy_opts.image {
info!(
"Deploying project '{}' with pre-built image '{}'",
deploy_opts.project_name, image_ref
);
} else {
info!(
"Deploying project '{}' from path '{}'",
deploy_opts.project_name, deploy_opts.path
);
}
let token = config
.get_token()
.ok_or_else(|| anyhow::anyhow!("Not authenticated. Run 'rise login' first."))?;
info!(
"Creating deployment for project '{}'",
deploy_opts.project_name
);
let deployment_info = call_create_deployment_api(
http_client,
backend_url,
&token,
deploy_opts.project_name,
deploy_opts.image,
deploy_opts.group,
deploy_opts.expires_in,
deploy_opts.http_port,
deploy_opts.from_deployment,
deploy_opts.use_source_env_vars,
deploy_opts.push_image,
)
.await?;
info!("Deployment ID: {}", deployment_info.deployment_id);
info!("Image tag: {}", deployment_info.image_tag);
let deployment_id_clone = deployment_info.deployment_id.clone();
let backend_url_clone = backend_url.to_string();
let http_client_clone = http_client.clone();
let token_clone = token.to_string();
tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
eprintln!("\n⚠️ Caught Ctrl+C, cancelling deployment...");
if let Err(e) = cancel_deployment(
&http_client_clone,
&backend_url_clone,
&token_clone,
&deployment_id_clone,
)
.await
{
eprintln!("Failed to cancel deployment: {}", e);
} else {
eprintln!("✓ Deployment cancelled");
}
std::process::exit(130); }
});
if deploy_opts.image.is_some() && deploy_opts.push_image {
let source_image = deploy_opts.image.unwrap();
info!(
"Pulling and pushing image '{}' to Rise registry",
source_image
);
let container_cli = match &deploy_opts.build_args.container_cli {
Some(cli) => cli.clone(),
None => config.get_container_cli().command().to_string(),
};
login_to_registry(
http_client,
backend_url,
&token,
&container_cli,
&deployment_info.credentials,
&deployment_info.deployment_id,
)
.await?;
if let Err(e) = build::docker_pull(&container_cli, source_image, "linux/amd64") {
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Failed",
Some(&e.to_string()),
)
.await?;
return Err(e);
}
if let Err(e) = build::docker_tag(&container_cli, source_image, &deployment_info.image_tag)
{
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Failed",
Some(&e.to_string()),
)
.await?;
return Err(e);
}
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Building",
None,
)
.await?;
if let Err(e) = build::docker_push(&container_cli, &deployment_info.image_tag) {
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Failed",
Some(&e.to_string()),
)
.await?;
return Err(e);
}
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Pushed",
None,
)
.await?;
info!(
"✓ Successfully pushed {} to {}",
source_image, deployment_info.image_tag
);
} else if deploy_opts.image.is_some() || deploy_opts.from_deployment.is_some() {
if let Some(from_deployment) = &deploy_opts.from_deployment {
info!(
"✓ Deployment created from existing deployment '{}' with {} environment variables",
from_deployment,
if deploy_opts.use_source_env_vars {
"source"
} else {
"current project"
}
);
} else {
info!("✓ Pre-built image deployment created");
}
} else {
let options = BuildOptions::from_build_args(
config,
deployment_info.image_tag.clone(),
deploy_opts.path.to_string(),
deploy_opts.build_args,
);
login_to_registry(
http_client,
backend_url,
&token,
options.container_cli.command(),
&deployment_info.credentials,
&deployment_info.deployment_id,
)
.await?;
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Building",
None,
)
.await?;
let options = options.with_push(true);
if let Err(e) = build::build_image(options) {
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Failed",
Some(&e.to_string()),
)
.await?;
return Err(e);
}
update_deployment_status(
http_client,
backend_url,
&token,
&deployment_info.deployment_id,
"Pushed",
None,
)
.await?;
info!(
"✓ Successfully pushed {} to {}",
deploy_opts.project_name, deployment_info.image_tag
);
}
info!(" Deployment ID: {}", deployment_info.deployment_id);
println!();
show_deployment(
http_client,
backend_url,
config,
deploy_opts.project_name,
&deployment_info.deployment_id,
true, "10m", )
.await?;
Ok(())
}
async fn login_to_registry(
http_client: &Client,
backend_url: &str,
token: &str,
container_cli: &str,
credentials: &RegistryCredentials,
deployment_id: &str,
) -> Result<()> {
if credentials.username.is_empty() {
return Ok(());
}
info!("Logging into registry");
if let Err(e) = build::docker_login(
container_cli,
&credentials.registry_url,
&credentials.username,
&credentials.password,
) {
update_deployment_status(
http_client,
backend_url,
token,
deployment_id,
"Failed",
Some(&e.to_string()),
)
.await?;
return Err(e);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn call_create_deployment_api(
http_client: &Client,
backend_url: &str,
token: &str,
project_name: &str,
image: Option<&str>,
group: Option<&str>,
expires_in: Option<&str>,
http_port: Option<u16>,
from_deployment: Option<&str>,
use_source_env_vars: bool,
push_image: bool,
) -> Result<CreateDeploymentResponse> {
let url = format!("{}/api/v1/deployments", backend_url);
let mut payload = serde_json::json!({
"project": project_name,
});
if let Some(port) = http_port {
payload["http_port"] = serde_json::json!(port);
}
if let Some(image_ref) = image {
payload["image"] = serde_json::json!(image_ref);
}
if let Some(group_name) = group {
payload["group"] = serde_json::json!(group_name);
}
if let Some(expiration) = expires_in {
payload["expires_in"] = serde_json::json!(expiration);
}
if let Some(source_deployment_id) = from_deployment {
payload["from_deployment"] = serde_json::json!(source_deployment_id);
payload["use_source_env_vars"] = serde_json::json!(use_source_env_vars);
}
if push_image {
payload["push_image"] = serde_json::json!(true);
}
let response = http_client
.post(&url)
.header("Authorization", format!("Bearer {}", token))
.json(&payload)
.send()
.await
.context("Failed to create deployment")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
bail!("Failed to create deployment ({}): {}", status, error_text);
}
let deployment_info: CreateDeploymentResponse = response
.json()
.await
.context("Failed to parse deployment response")?;
Ok(deployment_info)
}
async fn cancel_deployment(
http_client: &Client,
backend_url: &str,
token: &str,
deployment_id: &str,
) -> Result<()> {
let url = format!(
"{}/api/v1/deployments/{}/status",
backend_url, deployment_id
);
let payload = serde_json::json!({
"status": "Cancelling"
});
let response = http_client
.patch(&url)
.bearer_auth(token)
.json(&payload)
.send()
.await
.context("Failed to cancel deployment")?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
bail!("Failed to cancel deployment ({}): {}", status, error_text);
}
Ok(())
}
async fn update_deployment_status(
http_client: &Client,
backend_url: &str,
token: &str,
deployment_id: &str,
status: &str,
error_message: Option<&str>,
) -> Result<()> {
let url = format!(
"{}/api/v1/deployments/{}/status",
backend_url, deployment_id
);
let mut payload = serde_json::json!({
"status": status,
});
if let Some(error) = error_message {
payload["error_message"] = serde_json::json!(error);
}
debug!("Updating deployment {} status to {}", deployment_id, status);
let response = http_client
.patch(&url)
.header("Authorization", format!("Bearer {}", token))
.json(&payload)
.send()
.await;
match response {
Ok(resp) if resp.status().is_success() => Ok(()),
Ok(resp) => {
let status = resp.status();
let error = resp.text().await.unwrap_or_else(|_| "Unknown".to_string());
warn!("Failed to update deployment status: {} - {}", status, error);
Ok(())
}
Err(e) => {
warn!("Failed to update deployment status: {}", e);
Ok(())
}
}
}
pub struct GetLogsParams<'a> {
pub project: &'a str,
pub deployment_id: &'a str,
pub follow: bool,
pub tail: Option<usize>,
pub timestamps: bool,
pub since: Option<&'a str>,
}
pub async fn get_logs(
http_client: &reqwest::Client,
backend_url: &str,
token: &str,
params: GetLogsParams<'_>,
) -> anyhow::Result<()> {
use futures::StreamExt;
let mut url = format!(
"{}/api/v1/projects/{}/deployments/{}/logs",
backend_url, params.project, params.deployment_id
);
let mut query_params = vec![];
let tail_param;
let since_param;
if params.follow {
query_params.push("follow=true");
}
if let Some(t) = params.tail {
tail_param = format!("tail={}", t);
query_params.push(&tail_param);
}
if params.timestamps {
query_params.push("timestamps=true");
}
if let Some(s) = params.since {
let seconds = parse_duration_to_seconds(s)?;
since_param = format!("since={}", seconds);
query_params.push(&since_param);
}
if !query_params.is_empty() {
url.push('?');
url.push_str(&query_params.join("&"));
}
debug!("Fetching logs from: {}", url);
let response = http_client
.get(&url)
.header("Authorization", format!("Bearer {}", token))
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error = response
.text()
.await
.unwrap_or_else(|_| "Unknown".to_string());
return Err(anyhow::anyhow!(
"Failed to get logs ({}): {}",
status,
error
));
}
let ctrl_c = tokio::signal::ctrl_c();
tokio::pin!(ctrl_c);
let mut stream = response.bytes_stream();
let mut buffer = String::new();
loop {
tokio::select! {
_ = &mut ctrl_c => {
debug!("Received Ctrl+C, stopping log stream");
break;
}
chunk_result = stream.next() => {
match chunk_result {
Some(Ok(chunk)) => {
let text = String::from_utf8_lossy(&chunk);
buffer.push_str(&text);
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer.drain(..=newline_pos).collect::<String>();
let line = line.trim_end();
if let Some(data) = line.strip_prefix("data: ") {
if !data.is_empty() {
println!("{}", data);
}
} else if !line.is_empty() && !line.starts_with(':') {
println!("{}", line);
}
}
}
Some(Err(e)) => {
return Err(anyhow::anyhow!("Stream error: {}", e));
}
None => {
debug!("Log stream ended");
break;
}
}
}
}
}
if !buffer.is_empty() {
let line = buffer.trim();
if let Some(data) = line.strip_prefix("data: ") {
if !data.is_empty() {
println!("{}", data);
}
} else if !line.is_empty() && !line.starts_with(':') {
println!("{}", line);
}
}
Ok(())
}
fn parse_duration_to_seconds(duration: &str) -> anyhow::Result<i64> {
let duration = duration.trim();
if let Some(num_str) = duration.strip_suffix('s') {
let num: i64 = num_str.parse()?;
Ok(num)
} else if let Some(num_str) = duration.strip_suffix('m') {
let num: i64 = num_str.parse()?;
Ok(num * 60)
} else if let Some(num_str) = duration.strip_suffix('h') {
let num: i64 = num_str.parse()?;
Ok(num * 3600)
} else if let Some(num_str) = duration.strip_suffix('d') {
let num: i64 = num_str.parse()?;
Ok(num * 86400)
} else {
Err(anyhow::anyhow!(
"Invalid duration format '{}'. Use format like '5m', '1h', '30s', '7d'",
duration
))
}
}
pub(super) enum LogStreamError {
NotReady,
Gone,
Other(anyhow::Error),
}
impl std::fmt::Debug for LogStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogStreamError::NotReady => write!(f, "NotReady"),
LogStreamError::Gone => write!(f, "Gone"),
LogStreamError::Other(e) => write!(f, "Other({:?})", e),
}
}
}
pub(super) struct LogStream {
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, reqwest::Error>>,
buffer: String,
}
impl LogStream {
pub async fn recv(&mut self) -> Option<Result<String>> {
use futures::StreamExt;
loop {
if let Some(newline_pos) = self.buffer.find('\n') {
let line: String = self.buffer.drain(..=newline_pos).collect();
let line = line.trim_end();
if let Some(data) = line.strip_prefix("data: ") {
if !data.is_empty() {
return Some(Ok(data.to_string()));
}
continue;
} else if line.is_empty() || line.starts_with(':') {
continue;
} else {
return Some(Ok(line.to_string()));
}
}
match self.stream.next().await {
Some(Ok(chunk)) => {
let text = String::from_utf8_lossy(&chunk);
self.buffer.push_str(&text);
}
Some(Err(e)) => {
return Some(Err(anyhow::anyhow!("Log stream error: {}", e)));
}
None => {
if !self.buffer.is_empty() {
let remaining = std::mem::take(&mut self.buffer);
let line = remaining.trim();
if let Some(data) = line.strip_prefix("data: ") {
if !data.is_empty() {
return Some(Ok(data.to_string()));
}
} else if !line.is_empty() && !line.starts_with(':') {
return Some(Ok(line.to_string()));
}
}
return None;
}
}
}
}
}
pub(super) async fn open_log_stream(
http_client: &Client,
backend_url: &str,
token: &str,
project: &str,
deployment_id: &str,
tail: usize,
) -> Result<LogStream, LogStreamError> {
use futures::StreamExt;
let url = format!(
"{}/api/v1/projects/{}/deployments/{}/logs?follow=true&tail={}",
backend_url, project, deployment_id, tail
);
let response = http_client
.get(&url)
.bearer_auth(token)
.send()
.await
.map_err(|e| LogStreamError::Other(anyhow::anyhow!("Failed to connect: {}", e)))?;
let status = response.status();
if status == reqwest::StatusCode::SERVICE_UNAVAILABLE {
return Err(LogStreamError::NotReady);
}
if status == reqwest::StatusCode::GONE {
return Err(LogStreamError::Gone);
}
if !status.is_success() {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(LogStreamError::Other(anyhow::anyhow!(
"Failed to open log stream ({}): {}",
status,
error_text
)));
}
Ok(LogStream {
stream: response.bytes_stream().boxed(),
buffer: String::new(),
})
}