#include "binder/binder.h"
#include "binder/bound_import_database.h"
#include "common/copier_config/csv_reader_config.h"
#include "common/exception/binder.h"
#include "common/file_system/virtual_file_system.h"
#include "main/client_context.h"
#include "parser/copy.h"
#include "parser/parser.h"
#include "parser/port_db.h"
#include <format>
using namespace lbug::common;
using namespace lbug::parser;
namespace lbug {
namespace binder {
static std::string getQueryFromFile(VirtualFileSystem* vfs, const std::string& boundFilePath,
const std::string& fileName, main::ClientContext* context) {
auto filePath = vfs->joinPath(boundFilePath, fileName);
if (!vfs->fileOrPathExists(filePath, context)) {
if (fileName == PortDBConstants::COPY_FILE_NAME) {
return "";
}
if (fileName == PortDBConstants::INDEX_FILE_NAME) {
return "";
}
throw BinderException(std::format("File {} does not exist.", filePath));
}
auto fileInfo = vfs->openFile(filePath, FileOpenFlags(FileFlags::READ_ONLY
#ifdef _WIN32
| FileFlags::BINARY
#endif
));
auto fsize = fileInfo->getFileSize();
auto buffer = std::make_unique<char[]>(fsize);
fileInfo->readFile(buffer.get(), fsize);
return std::string(buffer.get(), fsize);
}
static std::string getColumnNamesToCopy(const CopyFrom& copyFrom) {
std::string columns = "";
std::string delimiter = "";
for (auto& column : copyFrom.getCopyColumnInfo().columnNames) {
columns += delimiter;
columns += "`" + column + "`";
if (delimiter == "") {
delimiter = ",";
}
}
if (columns.empty()) {
return columns;
}
return std::format("({})", columns);
}
static std::string getCopyFilePath(const std::string& boundFilePath, const std::string& filePath) {
if (filePath[0] == '/' || (std::isalpha(filePath[0]) && filePath[1] == ':')) {
return filePath;
}
auto path = boundFilePath + "/" + filePath;
#if defined(_WIN32)
size_t pos = 0;
while ((pos = path.find('\\', pos)) != std::string::npos) {
path.replace(pos, 1, "\\\\");
pos += 2;
}
#endif
return path;
}
std::unique_ptr<BoundStatement> Binder::bindImportDatabaseClause(const Statement& statement) {
auto& importDB = statement.constCast<ImportDB>();
auto fs = VirtualFileSystem::GetUnsafe(*clientContext);
auto boundFilePath = fs->expandPath(clientContext, importDB.getFilePath());
if (!fs->fileOrPathExists(boundFilePath, clientContext)) {
throw BinderException(std::format("Directory {} does not exist.", boundFilePath));
}
std::string finalQueryStatements;
finalQueryStatements +=
getQueryFromFile(fs, boundFilePath, PortDBConstants::SCHEMA_FILE_NAME, clientContext);
auto copyQuery =
getQueryFromFile(fs, boundFilePath, PortDBConstants::COPY_FILE_NAME, clientContext);
if (!copyQuery.empty()) {
auto parsedStatements = Parser::parseQuery(copyQuery);
for (auto& parsedStatement : parsedStatements) {
DASSERT(parsedStatement->getStatementType() == StatementType::COPY_FROM);
auto& copyFromStatement = parsedStatement->constCast<CopyFrom>();
DASSERT(copyFromStatement.getSource()->type == ScanSourceType::FILE);
auto filePaths =
copyFromStatement.getSource()->constPtrCast<FileScanSource>()->filePaths;
DASSERT(filePaths.size() == 1);
auto fileTypeInfo = bindFileTypeInfo(filePaths);
std::string query;
auto copyFilePath = getCopyFilePath(boundFilePath, filePaths[0]);
auto columnNames = getColumnNamesToCopy(copyFromStatement);
auto parsingOptions = bindParsingOptions(copyFromStatement.getParsingOptions());
std::unordered_map<std::string, std::string> copyFromOptions;
if (parsingOptions.contains(CopyConstants::FROM_OPTION_NAME)) {
DASSERT(parsingOptions.contains(CopyConstants::TO_OPTION_NAME));
copyFromOptions[CopyConstants::FROM_OPTION_NAME] = std::format("'{}'",
parsingOptions.at(CopyConstants::FROM_OPTION_NAME).getValue<std::string>());
copyFromOptions[CopyConstants::TO_OPTION_NAME] = std::format("'{}'",
parsingOptions.at(CopyConstants::TO_OPTION_NAME).getValue<std::string>());
parsingOptions.erase(CopyConstants::FROM_OPTION_NAME);
parsingOptions.erase(CopyConstants::TO_OPTION_NAME);
}
if (fileTypeInfo.fileType == FileType::CSV) {
auto csvConfig = CSVReaderConfig::construct(parsingOptions);
csvConfig.option.autoDetection = false;
auto optionsMap = csvConfig.option.toOptionsMap(csvConfig.parallel);
if (!copyFromOptions.empty()) {
optionsMap.insert(copyFromOptions.begin(), copyFromOptions.end());
}
query =
std::format("COPY `{}` {} FROM \"{}\" {};", copyFromStatement.getTableName(),
columnNames, copyFilePath, CSVOption::toCypher(optionsMap));
} else {
query =
std::format("COPY `{}` {} FROM \"{}\" {};", copyFromStatement.getTableName(),
columnNames, copyFilePath, CSVOption::toCypher(copyFromOptions));
}
finalQueryStatements += query;
}
}
return std::make_unique<BoundImportDatabase>(boundFilePath, finalQueryStatements,
getQueryFromFile(fs, boundFilePath, PortDBConstants::INDEX_FILE_NAME, clientContext));
}
} }