#include "processor/operator/simple/export_db.h"
#include <sstream>
#include "catalog/catalog.h"
#include "catalog/catalog_entry/index_catalog_entry.h"
#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "catalog/catalog_entry/sequence_catalog_entry.h"
#include "common/copier_config/csv_reader_config.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_utils.h"
#include "extension/extension_manager.h"
#include "function/scalar_macro_function.h"
#include "main/client_context.h"
#include "processor/execution_context.h"
#include "storage/buffer_manager/memory_manager.h"
#include <format>
using namespace lbug::common;
using namespace lbug::transaction;
using namespace lbug::catalog;
using namespace lbug::main;
namespace lbug {
namespace processor {
using std::stringstream;
std::string ExportDBPrintInfo::toString() const {
std::string result = "Export To: ";
result += filePath;
if (!options.empty()) {
result += ",Options: ";
auto it = options.begin();
for (auto i = 0u; it != options.end(); ++it, ++i) {
result += it->first + "=" + it->second.toString();
if (i < options.size() - 1) {
result += ", ";
}
}
}
return result;
}
static void writeStringStreamToFile(ClientContext* context, const std::string& ssString,
const std::string& path) {
const auto fileInfo = VirtualFileSystem::GetUnsafe(*context)->openFile(path,
FileOpenFlags(FileFlags::WRITE | FileFlags::CREATE_IF_NOT_EXISTS), context);
fileInfo->writeFile(reinterpret_cast<const uint8_t*>(ssString.c_str()), ssString.size(),
0 );
}
static std::string getTablePropertyDefinitions(const TableCatalogEntry* entry) {
std::string columns;
auto properties = entry->getProperties();
auto propertyIdx = 0u;
for (auto& property : properties) {
propertyIdx++;
if (property.getType() == LogicalType::INTERNAL_ID()) {
continue;
}
columns += "`" + property.getName() + "`";
columns += propertyIdx == properties.size() ? "" : ",";
}
return columns;
}
static void writeCopyNodeStatement(stringstream& ss, const TableCatalogEntry* entry,
const FileScanInfo* info,
const std::unordered_map<std::string, const std::atomic<bool>*>& canUseParallelReader) {
const auto csvConfig = CSVReaderConfig::construct(info->options);
auto fileName = entry->getName() + "." + StringUtils::getLower(info->fileTypeInfo.fileTypeStr);
std::string columns = getTablePropertyDefinitions(entry);
bool useParallelReader = true;
if (canUseParallelReader.contains(fileName)) {
useParallelReader = canUseParallelReader.at(fileName)->load();
}
auto copyOptionsCypher = CSVOption::toCypher(csvConfig.option.toOptionsMap(useParallelReader));
if (columns.empty()) {
ss << std::format("COPY `{}` FROM \"{}\" {};\n", entry->getName(), fileName,
copyOptionsCypher);
} else {
ss << std::format("COPY `{}` ({}) FROM \"{}\" {};\n", entry->getName(), columns, fileName,
copyOptionsCypher);
}
}
static void writeCopyRelStatement(stringstream& ss, const ClientContext* context,
const TableCatalogEntry* entry, const FileScanInfo* info,
const std::unordered_map<std::string, const std::atomic<bool>*>& canUseParallelReader) {
const auto csvConfig = CSVReaderConfig::construct(info->options);
std::string columns = getTablePropertyDefinitions(entry);
auto transaction = Transaction::Get(*context);
const auto catalog = Catalog::Get(*context);
for (auto& entryInfo : entry->constCast<RelGroupCatalogEntry>().getRelEntryInfos()) {
auto fromTableName =
catalog->getTableCatalogEntry(transaction, entryInfo.nodePair.srcTableID)->getName();
auto toTableName =
catalog->getTableCatalogEntry(transaction, entryInfo.nodePair.dstTableID)->getName();
auto fileName = std::format("{}_{}_{}.{}", entry->getName(), fromTableName, toTableName,
StringUtils::getLower(info->fileTypeInfo.fileTypeStr));
bool useParallelReader = true;
if (canUseParallelReader.contains(fileName)) {
useParallelReader = canUseParallelReader.at(fileName)->load();
}
auto copyOptionsMap = csvConfig.option.toOptionsMap(useParallelReader);
copyOptionsMap["from"] = std::format("'{}'", fromTableName);
copyOptionsMap["to"] = std::format("'{}'", toTableName);
auto copyOptions = CSVOption::toCypher(copyOptionsMap);
if (columns.empty()) {
ss << std::format("COPY `{}` FROM \"{}\" {};\n", entry->getName(), fileName,
copyOptions);
} else {
ss << std::format("COPY `{}` ({}) FROM \"{}\" {};\n", entry->getName(), columns,
fileName, copyOptions);
}
}
}
static void exportLoadedExtensions(stringstream& ss, const ClientContext* clientContext) {
auto extensionCypher = extension::ExtensionManager::Get(*clientContext)->toCypher();
if (!extensionCypher.empty()) {
ss << extensionCypher << std::endl;
}
}
std::string getSchemaCypher(ClientContext* clientContext) {
stringstream ss;
exportLoadedExtensions(ss, clientContext);
const auto catalog = Catalog::Get(*clientContext);
auto transaction = Transaction::Get(*clientContext);
ToCypherInfo toCypherInfo;
for (const auto& nodeTableEntry :
catalog->getNodeTableEntries(transaction, false )) {
ss << nodeTableEntry->toCypher(toCypherInfo) << std::endl;
}
RelGroupToCypherInfo relTableToCypherInfo{clientContext};
for (const auto& entry : catalog->getRelGroupEntries(transaction, false )) {
ss << entry->toCypher(relTableToCypherInfo) << std::endl;
}
RelGroupToCypherInfo relGroupToCypherInfo{clientContext};
for (const auto sequenceEntry : catalog->getSequenceEntries(transaction)) {
ss << sequenceEntry->toCypher(relGroupToCypherInfo) << std::endl;
}
for (auto macroName : catalog->getMacroNames(transaction)) {
ss << catalog->getScalarMacroFunction(transaction, macroName)->toCypher(macroName)
<< std::endl;
}
return ss.str();
}
std::string getCopyCypher(const ClientContext* context, const FileScanInfo* boundFileInfo,
const std::unordered_map<std::string, const std::atomic<bool>*>& canUseParallelReader) {
stringstream ss;
auto transaction = Transaction::Get(*context);
const auto catalog = Catalog::Get(*context);
for (const auto& nodeTableEntry :
catalog->getNodeTableEntries(transaction, false )) {
writeCopyNodeStatement(ss, nodeTableEntry, boundFileInfo, canUseParallelReader);
}
for (const auto& entry : catalog->getRelGroupEntries(transaction, false )) {
writeCopyRelStatement(ss, context, entry, boundFileInfo, canUseParallelReader);
}
return ss.str();
}
std::string getIndexCypher(ClientContext* clientContext, const FileScanInfo& exportFileInfo) {
stringstream ss;
IndexToCypherInfo info{clientContext, exportFileInfo};
auto transaction = Transaction::Get(*clientContext);
auto catalog = Catalog::Get(*clientContext);
for (auto entry : catalog->getIndexEntries(transaction)) {
auto indexCypher = entry->toCypher(info);
if (!indexCypher.empty()) {
ss << indexCypher << std::endl;
}
}
return ss.str();
}
void ExportDB::executeInternal(ExecutionContext* context) {
const auto clientContext = context->clientContext;
writeStringStreamToFile(clientContext, getSchemaCypher(clientContext),
boundFileInfo.filePaths[0] + "/" + PortDBConstants::SCHEMA_FILE_NAME);
if (schemaOnly) {
return;
}
writeStringStreamToFile(clientContext,
getCopyCypher(clientContext, &boundFileInfo, sharedState->canUseParallelReader),
boundFileInfo.filePaths[0] + "/" + PortDBConstants::COPY_FILE_NAME);
writeStringStreamToFile(clientContext, getIndexCypher(clientContext, boundFileInfo),
boundFileInfo.filePaths[0] + "/" + PortDBConstants::INDEX_FILE_NAME);
appendMessage("Exported database successfully.", storage::MemoryManager::Get(*clientContext));
}
} }