import os
import sys
import json
import subprocess
import shutil
import requests
import tarfile
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import argparse
import time
class OpenSearchManager:
def __init__(self):
self.installations = []
self.es_installations = []
self.os_installations = []
self.default_target = "/opt/prod/opensearch"
def run_command(self, cmd: List[str], capture_output: bool = True) -> Tuple[int, str, str]:
try:
result = subprocess.run(cmd, capture_output=capture_output, text=True)
return result.returncode, result.stdout, result.stderr
except Exception as e:
return 1, "", str(e)
def check_root(self):
if os.geteuid() != 0:
print("Este script precisa ser executado como root")
print("Use: sudo python3 opensearch-manager.py")
sys.exit(1)
def format_size(self, size_bytes: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f}{unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f}TB"
def get_service_info(self, port: int = 9200) -> Dict:
try:
response = requests.get(f"http://localhost:{port}/", timeout=5)
return response.json()
except:
return {}
def get_cluster_health(self, port: int = 9200) -> Dict:
try:
response = requests.get(f"http://localhost:{port}/_cluster/health", timeout=5)
return response.json()
except:
return {}
def get_indices_info(self, port: int = 9200) -> List[Dict]:
try:
response = requests.get(f"http://localhost:{port}/_cat/indices?format=json", timeout=5)
return response.json()
except:
return []
def find_installations(self):
print("\n=== Procurando instalações ===")
search_paths = [
"/opt/opensearch",
"/opt/elasticsearch",
"/usr/share/opensearch",
"/usr/share/elasticsearch",
"/var/lib/opensearch",
"/var/lib/elasticsearch",
]
for item in Path("/opt").iterdir():
if item.is_dir():
if "opensearch" in item.name.lower():
search_paths.append(str(item))
elif "elasticsearch" in item.name.lower():
search_paths.append(str(item))
for path in search_paths:
if os.path.exists(path):
info = self.analyze_installation(path)
if info:
self.installations.append(info)
if info['type'] == 'elasticsearch':
self.es_installations.append(info)
else:
self.os_installations.append(info)
self.check_running_services()
self.print_installations_summary()
def analyze_installation(self, path: str) -> Dict:
info = {
'path': path,
'type': 'opensearch' if 'opensearch' in path.lower() else 'elasticsearch',
'version': 'unknown',
'data_size': 0,
'config_path': None,
'data_path': None,
'logs_path': None,
'running': False,
'port': 9200
}
config_paths = [
os.path.join(path, 'config'),
'/etc/opensearch',
'/etc/elasticsearch'
]
for config_path in config_paths:
yml_file = os.path.join(config_path, f"{info['type']}.yml")
if os.path.exists(yml_file):
info['config_path'] = yml_file
try:
with open(yml_file, 'r') as f:
for line in f:
if 'http.port:' in line and not line.strip().startswith('#'):
port = line.split(':')[1].strip()
info['port'] = int(port)
except:
pass
break
data_paths = [
os.path.join(path, 'data'),
f"/var/lib/{info['type']}/data"
]
for data_path in data_paths:
if os.path.exists(data_path):
info['data_path'] = data_path
try:
total_size = 0
for dirpath, dirnames, filenames in os.walk(data_path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
total_size += os.path.getsize(filepath)
info['data_size'] = total_size
except:
pass
break
logs_paths = [
os.path.join(path, 'logs'),
f"/var/log/{info['type']}"
]
for logs_path in logs_paths:
if os.path.exists(logs_path):
info['logs_path'] = logs_path
break
version_files = [
os.path.join(path, 'VERSION'),
os.path.join(path, 'version.txt'),
os.path.join(path, 'lib', f'{info["type"]}-*.jar')
]
for version_file in version_files:
if '*' in version_file:
import glob
files = glob.glob(version_file)
if files:
filename = os.path.basename(files[0])
parts = filename.split('-')
if len(parts) > 1:
info['version'] = parts[1].replace('.jar', '')
break
elif os.path.exists(version_file):
try:
with open(version_file, 'r') as f:
info['version'] = f.read().strip()
break
except:
pass
return info
def check_running_services(self):
services = ['opensearch', 'elasticsearch']
for service in services:
code, stdout, _ = self.run_command(['systemctl', 'is-active', f'{service}.service'])
if code == 0 and 'active' in stdout:
for inst in self.installations:
if inst['type'] == service:
inst['running'] = True
api_info = self.get_service_info(inst['port'])
if api_info and 'version' in api_info:
inst['version'] = api_info['version']['number']
inst['api_info'] = api_info
ports_to_check = [9200, 9201, 9300, 9301]
for port in ports_to_check:
api_info = self.get_service_info(port)
if api_info:
found = False
for inst in self.installations:
if inst['port'] == port or (inst['running'] and 'version' in api_info and api_info['version']['number'] == inst['version']):
inst['running'] = True
inst['port'] = port
inst['api_info'] = api_info
if 'version' in api_info:
inst['version'] = api_info['version']['number']
found = True
break
if not found:
new_inst = {
'path': 'unknown',
'type': 'opensearch' if 'opensearch' in api_info.get('name', '').lower() else 'elasticsearch',
'version': api_info['version']['number'] if 'version' in api_info else 'unknown',
'data_size': 0,
'config_path': None,
'data_path': None,
'logs_path': None,
'running': True,
'port': port,
'api_info': api_info
}
self.installations.append(new_inst)
def print_installations_summary(self):
if not self.installations:
print("\nNenhuma instalação encontrada.")
return
print(f"\nEncontradas {len(self.installations)} instalação(ões):\n")
for i, inst in enumerate(self.installations, 1):
print(f"{i}. {inst['type'].upper()} v{inst['version']}")
print(f" Path: {inst['path']}")
print(f" Status: {'RODANDO' if inst['running'] else 'PARADO'}")
if inst['running']:
print(f" Porta: {inst['port']}")
if inst['data_size'] > 0:
print(f" Dados: {self.format_size(inst['data_size'])}")
if inst['running']:
health = self.get_cluster_health(inst['port'])
if health:
print(f" Cluster: {health.get('cluster_name', 'unknown')} - Status: {health.get('status', 'unknown')}")
print(f" Nodes: {health.get('number_of_nodes', 0)} - Índices: {health.get('active_primary_shards', 0)}")
print()
def show_menu(self):
print("\n=== MENU DE OPÇÕES ===\n")
options = []
if not self.installations:
options.append(("1", "Instalar OpenSearch novo", self.install_new))
else:
option_num = 1
for inst in self.es_installations:
version = inst['version']
if version != 'unknown':
try:
major_version = int(version.split('.')[0])
if major_version <= 7:
options.append((str(option_num), f"Migrar Elasticsearch {version} para OpenSearch", lambda i=inst: self.migrate_es_to_os(i)))
option_num += 1
except:
pass
for inst in self.os_installations:
options.append((str(option_num), f"Mover OpenSearch de {inst['path']} para {self.default_target}", lambda i=inst: self.move_installation(i)))
option_num += 1
options.append((str(option_num), f"Atualizar OpenSearch {inst['version']} para versão mais recente", lambda i=inst: self.upgrade_opensearch(i)))
option_num += 1
options.append((str(option_num), "Fazer backup de uma instalação", self.backup_installation))
option_num += 1
options.append((str(option_num), "Desinstalar uma instalação", self.uninstall))
option_num += 1
options.append((str(option_num), "Instalar OpenSearch novo", self.install_new))
option_num += 1
options.append(("0", "Sair", None))
for opt_num, desc, _ in options:
print(f"{opt_num}. {desc}")
while True:
choice = input("\nEscolha uma opção: ").strip()
for opt_num, desc, func in options:
if choice == opt_num:
if func:
print(f"\n=== {desc} ===")
func()
return choice == "0"
print("Opção inválida. Tente novamente.")
def migrate_es_to_os(self, es_inst: Dict):
print(f"\nMigrando Elasticsearch {es_inst['version']} para OpenSearch...")
print("\n1. Instalando OpenSearch em porta temporária (9201)...")
temp_port = 9201
os_path = self.default_target
if not self.download_and_install_opensearch(os_path, temp_port):
print("Erro na instalação do OpenSearch")
return
print("\n2. Iniciando OpenSearch...")
self.start_opensearch(os_path, temp_port)
time.sleep(10)
es_health = self.get_cluster_health(es_inst['port'])
os_health = self.get_cluster_health(temp_port)
if not es_health or not os_health:
print("Erro: Um dos serviços não está respondendo")
return
print(f"\nElasticsearch: {es_health['cluster_name']} - {es_health['status']}")
print(f"OpenSearch: {os_health['cluster_name']} - {os_health['status']}")
print("\n3. Migrando dados...")
migration_method = input("\nEscolha método de migração:\n1. Reindex (recomendado para dados pequenos)\n2. Snapshot/Restore (recomendado para dados grandes)\nEscolha (1/2): ")
if migration_method == "1":
self.migrate_via_reindex(es_inst['port'], temp_port)
else:
self.migrate_via_snapshot(es_inst, os_path, temp_port)
print("\n4. Validando migração...")
es_indices = self.get_indices_info(es_inst['port'])
os_indices = self.get_indices_info(temp_port)
print(f"Índices no Elasticsearch: {len(es_indices)}")
print(f"Índices no OpenSearch: {len(os_indices)}")
if len(os_indices) < len(es_indices):
print("\nAVISO: Nem todos os índices foram migrados!")
if input("Continuar mesmo assim? (s/N): ").lower() != 's':
return
print("\n5. Parando Elasticsearch...")
self.stop_service('elasticsearch')
print("\n6. Reconfigurando OpenSearch para porta padrão...")
self.reconfigure_port(os_path, temp_port, 9200)
print("\nMigração concluída!")
print(f"OpenSearch instalado em: {os_path}")
print("Elasticsearch foi parado mas não removido (use a opção de desinstalar se desejar)")
def migrate_via_reindex(self, source_port: int, target_port: int):
indices = self.get_indices_info(source_port)
print(f"\nMigrando {len(indices)} índices...")
for idx in indices:
if idx['index'].startswith('.'): continue
print(f"Migrando índice: {idx['index']}...", end='', flush=True)
reindex_body = {
"source": {
"remote": {
"host": f"http://localhost:{source_port}"
},
"index": idx['index']
},
"dest": {
"index": idx['index']
}
}
try:
response = requests.post(
f"http://localhost:{target_port}/_reindex",
json=reindex_body,
timeout=300
)
if response.status_code == 200:
print(" OK")
else:
print(f" ERRO: {response.text}")
except Exception as e:
print(f" ERRO: {e}")
def migrate_via_snapshot(self, source_inst: Dict, target_path: str, target_port: int):
snapshot_path = "/tmp/migration-snapshot"
repo_name = "migration-repo"
snapshot_name = f"migration-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
os.makedirs(snapshot_path, exist_ok=True)
os.chmod(snapshot_path, 0o777)
print(f"\nCriando snapshot repository...")
repo_body = {
"type": "fs",
"settings": {
"location": snapshot_path
}
}
response = requests.put(
f"http://localhost:{source_inst['port']}/_snapshot/{repo_name}",
json=repo_body
)
if response.status_code != 200:
print(f"Erro ao criar repository: {response.text}")
return
print(f"Criando snapshot...")
response = requests.put(
f"http://localhost:{source_inst['port']}/_snapshot/{repo_name}/{snapshot_name}?wait_for_completion=true"
)
if response.status_code != 200:
print(f"Erro ao criar snapshot: {response.text}")
return
print("Snapshot criado com sucesso!")
print("\nConfigurando repository no OpenSearch...")
response = requests.put(
f"http://localhost:{target_port}/_snapshot/{repo_name}",
json=repo_body
)
print("Restaurando snapshot...")
response = requests.post(
f"http://localhost:{target_port}/_snapshot/{repo_name}/{snapshot_name}/_restore"
)
if response.status_code == 200:
print("Restore iniciado com sucesso!")
else:
print(f"Erro no restore: {response.text}")
def move_installation(self, inst: Dict):
print(f"\nMovendo {inst['type']} de {inst['path']} para {self.default_target}")
if os.path.exists(self.default_target):
print(f"ERRO: Destino {self.default_target} já existe!")
return
if inst['running']:
print("Parando serviço...")
self.stop_service(inst['type'])
os.makedirs(os.path.dirname(self.default_target), exist_ok=True)
print(f"Movendo arquivos...")
shutil.move(inst['path'], self.default_target)
self.update_paths_after_move(inst['type'], inst['path'], self.default_target)
print("Reiniciando serviço...")
self.update_systemd_service(inst['type'], self.default_target)
self.start_service(inst['type'])
print(f"\nInstalação movida com sucesso para {self.default_target}")
def update_paths_after_move(self, service_type: str, old_path: str, new_path: str):
config_file = os.path.join(new_path, 'config', f'{service_type}.yml')
if os.path.exists(config_file):
with open(config_file, 'r') as f:
content = f.read()
content = content.replace(old_path, new_path)
with open(config_file, 'w') as f:
f.write(content)
env_file = os.path.join(new_path, 'bin', f'{service_type}-env')
if os.path.exists(env_file):
with open(env_file, 'r') as f:
content = f.read()
content = content.replace(old_path, new_path)
with open(env_file, 'w') as f:
f.write(content)
def upgrade_opensearch(self, inst: Dict):
print(f"\nAtualizando OpenSearch {inst['version']}...")
latest_version = self.get_latest_opensearch_version()
print(f"Versão mais recente disponível: {latest_version}")
if inst['version'] == latest_version:
print("Já está na versão mais recente!")
return
print("\nFazendo backup antes da atualização...")
backup_path = self.create_backup(inst)
if not backup_path:
print("Erro ao criar backup. Atualização cancelada.")
return
print(f"Backup salvo em: {backup_path}")
print("\nBaixando nova versão...")
temp_dir = tempfile.mkdtemp()
if not self.download_opensearch(latest_version, temp_dir):
print("Erro ao baixar nova versão")
return
if inst['running']:
print("Parando serviço...")
self.stop_service('opensearch')
print("Atualizando arquivos...")
self.update_opensearch_files(inst['path'], temp_dir, inst['version'], latest_version)
print("Reiniciando serviço...")
self.start_service('opensearch')
shutil.rmtree(temp_dir)
print(f"\nAtualização concluída! OpenSearch atualizado para versão {latest_version}")
def backup_installation(self):
if not self.installations:
print("Nenhuma instalação encontrada para backup")
return
print("\nEscolha a instalação para backup:")
for i, inst in enumerate(self.installations, 1):
print(f"{i}. {inst['type']} v{inst['version']} em {inst['path']}")
choice = input("\nEscolha: ")
try:
inst = self.installations[int(choice) - 1]
except:
print("Escolha inválida")
return
backup_path = self.create_backup(inst)
if backup_path:
print(f"\nBackup criado com sucesso em: {backup_path}")
else:
print("\nErro ao criar backup")
def create_backup(self, inst: Dict) -> Optional[str]:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
backup_name = f"{inst['type']}-backup-{timestamp}.tar.gz"
backup_path = os.path.join("/tmp", backup_name)
print(f"Criando backup em {backup_path}...")
try:
with tarfile.open(backup_path, "w:gz") as tar:
if inst['config_path']:
config_dir = os.path.dirname(inst['config_path'])
tar.add(config_dir, arcname=f"config")
if inst['data_path'] and os.path.exists(inst['data_path']):
print("Incluindo dados no backup (pode demorar)...")
tar.add(inst['data_path'], arcname="data")
info_file = os.path.join("/tmp", "backup_info.json")
with open(info_file, 'w') as f:
json.dump(inst, f, indent=2)
tar.add(info_file, arcname="backup_info.json")
os.remove(info_file)
return backup_path
except Exception as e:
print(f"Erro ao criar backup: {e}")
return None
def uninstall(self):
if not self.installations:
print("Nenhuma instalação encontrada")
return
print("\nEscolha a instalação para desinstalar:")
for i, inst in enumerate(self.installations, 1):
status = "RODANDO" if inst['running'] else "PARADO"
print(f"{i}. {inst['type']} v{inst['version']} em {inst['path']} ({status})")
choice = input("\nEscolha: ")
try:
inst = self.installations[int(choice) - 1]
except:
print("Escolha inválida")
return
print(f"\nATENÇÃO: Isso irá remover completamente {inst['type']} incluindo:")
print("- Todos os arquivos do programa")
print("- Dados e índices")
print("- Configurações")
print("- Logs")
if inst['data_size'] > 0:
print(f"\nVolume de dados que será removido: {self.format_size(inst['data_size'])}")
if input("\nDeseja fazer backup antes de desinstalar? (s/N): ").lower() == 's':
backup_path = self.create_backup(inst)
if backup_path:
print(f"Backup salvo em: {backup_path}")
else:
if input("Backup falhou. Continuar mesmo assim? (s/N): ").lower() != 's':
return
if input("\nConfirma a desinstalação? (s/N): ").lower() != 's':
return
if inst['running']:
print("Parando serviço...")
self.stop_service(inst['type'])
print("Removendo arquivos...")
paths_to_remove = [inst['path']]
if inst['config_path'] and '/etc/' in inst['config_path']:
paths_to_remove.append(os.path.dirname(inst['config_path']))
if inst['data_path'] and inst['data_path'] not in paths_to_remove:
paths_to_remove.append(inst['data_path'])
if inst['logs_path'] and inst['logs_path'] not in paths_to_remove:
paths_to_remove.append(inst['logs_path'])
for path in paths_to_remove:
if path and path != 'unknown' and os.path.exists(path):
print(f"Removendo: {path}")
shutil.rmtree(path)
service_file = f"/etc/systemd/system/{inst['type']}.service"
if os.path.exists(service_file):
os.remove(service_file)
self.run_command(['systemctl', 'daemon-reload'])
self.run_command(['userdel', inst['type']])
print(f"\n{inst['type']} desinstalado com sucesso!")
def install_new(self):
print("\nInstalando nova versão do OpenSearch...")
if os.path.exists(self.default_target):
print(f"ERRO: Já existe uma instalação em {self.default_target}")
return
latest_version = self.get_latest_opensearch_version()
version = input(f"\nQual versão instalar? (Enter para {latest_version}): ").strip()
if not version:
version = latest_version
if self.download_and_install_opensearch(self.default_target, 9200, version):
print(f"\nOpenSearch {version} instalado com sucesso em {self.default_target}")
print("Para iniciar o serviço, execute: systemctl start opensearch")
else:
print("\nErro na instalação")
def get_latest_opensearch_version(self) -> str:
return "2.11.0"
def download_opensearch(self, version: str, dest_dir: str) -> bool:
url = f"https://artifacts.opensearch.org/releases/bundle/opensearch/{version}/opensearch-{version}-linux-x64.tar.gz"
print(f"Baixando OpenSearch {version}...")
try:
response = requests.get(url, stream=True)
response.raise_for_status()
tar_path = os.path.join(dest_dir, f"opensearch-{version}.tar.gz")
total_size = int(response.headers.get('content-length', 0))
block_size = 8192
downloaded = 0
with open(tar_path, 'wb') as f:
for chunk in response.iter_content(block_size):
downloaded += len(chunk)
f.write(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"\rProgresso: {percent:.1f}%", end='', flush=True)
print("\nExtraindo arquivos...")
with tarfile.open(tar_path, 'r:gz') as tar:
tar.extractall(dest_dir)
os.remove(tar_path)
return True
except Exception as e:
print(f"\nErro ao baixar: {e}")
return False
def download_and_install_opensearch(self, install_path: str, port: int = 9200, version: str = None) -> bool:
if not version:
version = self.get_latest_opensearch_version()
temp_dir = tempfile.mkdtemp()
try:
if not self.download_opensearch(version, temp_dir):
return False
extracted_dir = os.path.join(temp_dir, f"opensearch-{version}")
if os.path.exists(extracted_dir):
os.makedirs(os.path.dirname(install_path), exist_ok=True)
shutil.move(extracted_dir, install_path)
else:
print("Erro: diretório extraído não encontrado")
return False
self.configure_opensearch(install_path, port)
self.run_command(['useradd', '-r', '-s', '/bin/false', 'opensearch'])
self.run_command(['chown', '-R', 'opensearch:opensearch', install_path])
self.create_systemd_service('opensearch', install_path)
return True
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
def configure_opensearch(self, install_path: str, port: int):
config_file = os.path.join(install_path, 'config', 'opensearch.yml')
config = f"""
cluster.name: opensearch-cluster
node.name: node-1
path.data: {install_path}/data
path.logs: {install_path}/logs
network.host: 0.0.0.0
http.port: {port}
discovery.type: single-node
plugins.security.disabled: true
"""
with open(config_file, 'w') as f:
f.write(config)
os.makedirs(os.path.join(install_path, 'data'), exist_ok=True)
os.makedirs(os.path.join(install_path, 'logs'), exist_ok=True)
def create_systemd_service(self, service_type: str, install_path: str):
service_content = f"""[Unit]
Description=OpenSearch
Documentation=https://opensearch.org/docs
Wants=network-online.target
After=network-online.target
[Service]
Type=notify
RuntimeDirectory={service_type}
PrivateTmp=true
Environment=OPENSEARCH_HOME={install_path}
Environment=OPENSEARCH_PATH_CONF={install_path}/config
WorkingDirectory={install_path}
User=opensearch
Group=opensearch
ExecStart={install_path}/bin/opensearch
StandardOutput=journal
StandardError=inherit
LimitNOFILE=65535
LimitNPROC=4096
LimitAS=infinity
LimitFSIZE=infinity
TimeoutStopSec=0
KillSignal=SIGTERM
KillMode=process
SendSIGKILL=no
SuccessExitStatus=143
[Install]
WantedBy=multi-user.target
"""
service_file = f"/etc/systemd/system/{service_type}.service"
with open(service_file, 'w') as f:
f.write(service_content)
self.run_command(['systemctl', 'daemon-reload'])
def update_systemd_service(self, service_type: str, new_path: str):
service_file = f"/etc/systemd/system/{service_type}.service"
if os.path.exists(service_file):
with open(service_file, 'r') as f:
content = f.read()
lines = content.split('\n')
for i, line in enumerate(lines):
if 'Environment=OPENSEARCH_HOME=' in line or 'Environment=ELASTICSEARCH_HOME=' in line:
lines[i] = f"Environment=OPENSEARCH_HOME={new_path}"
elif 'Environment=OPENSEARCH_PATH_CONF=' in line or 'Environment=ELASTICSEARCH_PATH_CONF=' in line:
lines[i] = f"Environment=OPENSEARCH_PATH_CONF={new_path}/config"
elif 'WorkingDirectory=' in line:
lines[i] = f"WorkingDirectory={new_path}"
elif 'ExecStart=' in line:
lines[i] = f"ExecStart={new_path}/bin/{service_type}"
with open(service_file, 'w') as f:
f.write('\n'.join(lines))
self.run_command(['systemctl', 'daemon-reload'])
def stop_service(self, service_type: str):
self.run_command(['systemctl', 'stop', f'{service_type}.service'])
def start_service(self, service_type: str):
self.run_command(['systemctl', 'start', f'{service_type}.service'])
self.run_command(['systemctl', 'enable', f'{service_type}.service'])
def start_opensearch(self, install_path: str, port: int):
config_file = os.path.join(install_path, 'config', 'opensearch.yml')
self.reconfigure_port(install_path, 9200, port)
self.start_service('opensearch')
def reconfigure_port(self, install_path: str, old_port: int, new_port: int):
config_file = os.path.join(install_path, 'config', 'opensearch.yml')
if os.path.exists(config_file):
with open(config_file, 'r') as f:
content = f.read()
content = content.replace(f'http.port: {old_port}', f'http.port: {new_port}')
with open(config_file, 'w') as f:
f.write(content)
def update_opensearch_files(self, install_path: str, new_files_path: str, old_version: str, new_version: str):
config_backup = os.path.join("/tmp", "opensearch-config-backup")
shutil.copytree(os.path.join(install_path, 'config'), config_backup)
preserve_dirs = ['data', 'logs']
for item in os.listdir(install_path):
item_path = os.path.join(install_path, item)
if item not in preserve_dirs + ['config']:
if os.path.isdir(item_path):
shutil.rmtree(item_path)
else:
os.remove(item_path)
new_opensearch_dir = os.path.join(new_files_path, f"opensearch-{new_version}")
for item in os.listdir(new_opensearch_dir):
if item not in preserve_dirs:
src = os.path.join(new_opensearch_dir, item)
dst = os.path.join(install_path, item)
if os.path.isdir(src):
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
self.merge_configs(config_backup, os.path.join(install_path, 'config'))
shutil.rmtree(config_backup)
self.run_command(['chown', '-R', 'opensearch:opensearch', install_path])
def merge_configs(self, old_config_path: str, new_config_path: str):
old_yml = os.path.join(old_config_path, 'opensearch.yml')
new_yml = os.path.join(new_config_path, 'opensearch.yml')
if os.path.exists(old_yml):
shutil.copy2(old_yml, new_yml)
def run(self):
print("=== OpenSearch/Elasticsearch Manager ===")
print("Analisando sistema...\n")
self.check_root()
self.find_installations()
while True:
if self.show_menu():
break
print("\nObrigado por usar o OpenSearch Manager!")
def main():
parser = argparse.ArgumentParser(description='Gerenciador unificado para OpenSearch e Elasticsearch')
parser.add_argument('--target', default='/opt/prod/opensearch', help='Diretório de destino para instalações')
parser.add_argument('--auto', action='store_true', help='Modo automático (sem interação)')
args = parser.parse_args()
manager = OpenSearchManager()
if args.target:
manager.default_target = args.target
try:
manager.run()
except KeyboardInterrupt:
print("\n\nOperação cancelada pelo usuário")
sys.exit(0)
except Exception as e:
print(f"\nErro: {e}")
sys.exit(1)
if __name__ == "__main__":
main()